DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Building Scalable Data Lake Using AWS
  • Building a Scalable ML Pipeline and API in AWS
  • Breaking AWS Lambda: Chaos Engineering for Serverless Devs
  • AWS Step Functions Local: Mocking Services, HTTP Endpoints Limitations

Trending

  • Ethical AI in Agile
  • Transforming AI-Driven Data Analytics with DeepSeek: A New Era of Intelligent Insights
  • Why High-Performance AI/ML Is Essential in Modern Cybersecurity
  • Docker Model Runner: Streamlining AI Deployment for Developers
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Learn How To Use DynamoDB Streams With AWS Lambda and Go

Learn How To Use DynamoDB Streams With AWS Lambda and Go

This blog post will help you get quickly started with DynamoDB Streams and AWS Lambda using Go. It will cover how to deploy the entire solution using AWS CDK.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Jul. 21, 22 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
34.9K Views

Join the DZone community and get the full member experience.

Join For Free

This blog post will help you get quickly started with DynamoDB Streams and AWS Lambda using Go. It will cover how to deploy the entire solution using AWS CDK.

The use case presented here is pretty simple. There are a couple of DynamoDB tables and the goal is to capture the data in one of those tables (also referred to as the source table) and replicate them to another table (also referred to as the target table) so that it can serve different queries. To demonstrate an end-to-end flow, there is also an Amazon API Gateway that front ends a Lambda function which persists data in the source DynamoDB table. Changes in this table will trigger another Lambda function (thanks to DynamoDB Streams) which will finally replicate the data into the target table.

High level architecture diagram

Global or Local Secondary Index offer similar capability.

Now that you have an overview of what we are trying to achieve here...

Let’s Get Right to It!

Before you proceed, make sure you have the Go programming language (v1.16 or higher) and AWS CDK installed.

Clone the project and change it to the right directory:

git clone https://github.com/abhirockzz/dynamodb-streams-lambda-golang

cd cdk


To Start the Deployment:

All you need to do is run a single command (cdk deploy), and wait for a bit. You will see a list of resources that will be created and will need to provide your confirmation to proceed.

Don't worry: in the next section, I will explain what's happening.

cdk deploy

# output

Bundling asset DynamoDBStreamsLambdaGolangStack/ddb-streams-function/Code/Stage...

✨  Synthesis time: 5.94s

This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:

//.... omitted

Do you wish to deploy these changes (y/n)? y


This will start creating the AWS resources required for our application.

If you want to see the AWS CloudFormation template which will be used behind the scenes, run cdk synth and check the cdk.out folder.

You can keep track of the progress in the terminal or navigate to AWS console: CloudFormation > Stacks > DynamoDBStreamsLambdaGolangStack

AWS CloudFormation Stack

Once all the resources are created, you can try out the application. You should have:

  • Two Lambda functions
  • Two DynamoDB tables (source and target)
  • One API Gateway (also route, integration)
  • A few others (like IAM roles, etc.)

Before you proceed, get the API Gateway endpoint that you will need to use. It's available in the stack output (in the terminal or the Outputs tab in the AWS CloudFormation console for your Stack):

cdk deploy output

End-to-End Solution

Start by creating a few users in the (source) DynamoDB table.

To do this, invoke the API Gateway (HTTP) endpoint with the appropriate JSON payload:

# export the API Gateway endpoint
export APIGW_ENDPOINT=<replace with API gateway endpoint above>

# for example:
export APIGW_ENDPOINT=https://gy8gxsx9x7.execute-api.us-east-1.amazonaws.com/

# invoke the endpoint with JSON data

curl -i -X POST -d '{"email":"user1@foo.com", "state":"New York","city":"Brooklyn","zipcode": "11208"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user2@foo.com", "state":"New York","city":"Staten Island","zipcode": "10314"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user3@foo.com", "state":"Ohio","city":"Cincinnati","zipcode": "45201"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user4@foo.com", "state":"Ohio","city":"Cleveland","zipcode": "44101"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user5@foo.com", "state":"California","city":"Los Angeles","zipcode": "90003"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT


Navigate to the DynamoDB table in the AWS console and ensure that records have been created:

If you the AWS CLI handy, you can also try aws dynamodb scan --table <name of table>.  

DynamoDB source table

If all goes well, our replication function should also work. To confirm, you need to check the target DynamoDB table.

DynamoDB target table

Notice that the zipcode attribute is missing. This is done on purpose for this demo. You can pick and choose the attributes you want to include in the target table and write your function logic accordingly.

The target DynamoDB table has state as a partition key and city as the sort key, you can query it in a different way (as compared to the source table which you could query only based on email).

Don’t Forget To Clean Up!

Once you're done, to delete all the services, simply use:

cdk destroy

#output prompt (choose 'y' to continue)

Are you sure you want to delete: DynamoDBStreamsLambdaGolangStack (y/n)?


Awesome! You were able to set up and try the complete solution. Before we wrap up, let's quickly walk through some of the important parts of the code to get a better understanding of what's going the behind the scenes.

Code Walk-Through

Since we will only focus on the important bits, a lot of the code (print statements, error handling, etc.) is omitted/commented out for brevity.

Infra-IS-Code With AWS CDK and Go!

You can refer to the CDK code here.

We start by creating a DynamoDB table and ensure that DynamoDB Streams is enabled.

    sourceDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("source-dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("email"),
                Type: awsdynamodb.AttributeType_STRING},
            Stream: awsdynamodb.StreamViewType_NEW_AND_OLD_IMAGES})

    sourceDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)


Then, we handle the Lambda function (this will take care of building and deploying the function) and make sure we provide it appropriate permissions to write to the DynamoDB table.

    createUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("create-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{envVarName: sourceDynamoDBTable.TableName()},
            Entry:       jsii.String(createFunctionDir)})

    sourceDynamoDBTable.GrantWriteData(createUserFunction)


The API Gateway (HTTP API) is created, along with the HTTP-Lambda Function integration as well as the appropriate route.

    api := awscdkapigatewayv2alpha.NewHttpApi(stack, jsii.String("http-api"), nil)

    createFunctionIntg := awscdkapigatewayv2integrationsalpha.NewHttpLambdaIntegration(jsii.String("create-function-integration"), createUserFunction, nil)

    api.AddRoutes(&awscdkapigatewayv2alpha.AddRoutesOptions{
        Path:        jsii.String("/"),
        Methods:     &[]awscdkapigatewayv2alpha.HttpMethod{awscdkapigatewayv2alpha.HttpMethod_POST},
        Integration: createFunctionIntg})


We also need the target DynamoDB table. Note that this table has a composite Primary Key (state and city):

    targetDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("target-dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("state"),
                Type: awsdynamodb.AttributeType_STRING},
            SortKey: &awsdynamodb.Attribute{
                Name: jsii.String("city"),
                Type: awsdynamodb.AttributeType_STRING},
        })

    targetDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)


Finally, we create the second Lambda function which is responsible for data replication, grant it the right permissions, and most importantly, add the DynamoDB as the event source.

    replicateUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("replicate-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{envVarName: targetDynamoDBTable.TableName()},
            Entry:       jsii.String(replicateFunctionDir)})

    replicateUserFunction.AddEventSource(awslambdaeventsources.NewDynamoEventSource(sourceDynamoDBTable, &awslambdaeventsources.DynamoEventSourceProps{StartingPosition: awslambda.StartingPosition_LATEST, Enabled: jsii.Bool(true)}))

    targetDynamoDBTable.GrantWriteData(replicateUserFunction)


Lambda Function: Create User

You can refer to the Lambda Function code here.

The function logic is pretty straightforward - it converts the incoming JSON payload into a Go struct and then invokes DynamoDB PutItem API to persist the data.

func handler(ctx context.Context, req events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
    payload := req.Body
    var user User

    err := json.Unmarshal([]byte(payload), &user)
    if err != nil {//..handle}

    item := make(map[string]types.AttributeValue)

    item["email"] = &types.AttributeValueMemberS{Value: user.Email}
    item["state"] = &types.AttributeValueMemberS{Value: user.State}
    item["city"] = &types.AttributeValueMemberS{Value: user.City}
    item["zipcode"] = &types.AttributeValueMemberN{Value: user.Zipcode}
    item["active"] = &types.AttributeValueMemberBOOL{Value: true}

    _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
        TableName: aws.String(table),
        Item:      item,
    })

    if err != nil {//..handle}
    return events.APIGatewayV2HTTPResponse{StatusCode: http.StatusCreated}, nil
}


Lambda Function: Replicate Data

You can refer to the Lambda Function code here.

The handler for the data replication function accepts DynamoDBEvent as a parameter. It extracts the new added record and creates a new record which can be saved to the target DynamoDB table. The data type for each attribute is checked and handled accordingly. Although the code just shows String and Boolean types, this can be used for other DynamoDB data types such as Maps, Sets etc.

func handler(ctx context.Context, e events.DynamoDBEvent) {
    for _, r := range e.Records {
        item := make(map[string]types.AttributeValue)

        for k, v := range r.Change.NewImage {
            if v.DataType() == events.DataTypeString {
                item[k] = &types.AttributeValueMemberS{Value: v.String()}
            } else if v.DataType() == events.DataTypeBoolean {
                item[k] = &types.AttributeValueMemberBOOL{Value: v.Boolean()}
            }
        }

        _, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{
            TableName: aws.String(table),
            Item:      item})

        if err != nil {//..handle}
    }
}


Here are some things you can try out:

  • Insert more data in the source table. Look for ways to do bulk inserts into a DynamoDB table.
  • Execute queries in the target table based on state, city or both.

Wrap Up

In this blog, you saw a simple example of how to leverage DynamoDB Streams to react to table data changes in near-real time using a combination of DynamoDB Streams and Lambda functions. You also used AWS CDK to deploy the entire infrastructure including API Gateway, Lambda Functions, DynamoDB tables, integrations as well as Lambda event source mappings.

All this was done using the Go programming language, which is very well supported in DynamoDB, AWS Lambda, and AWS CDK.

Happy building!

AWS AWS Lambda Stream (computing)

Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Building Scalable Data Lake Using AWS
  • Building a Scalable ML Pipeline and API in AWS
  • Breaking AWS Lambda: Chaos Engineering for Serverless Devs
  • AWS Step Functions Local: Mocking Services, HTTP Endpoints Limitations

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!