DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Use Golang for Data Processing With Amazon Kinesis and AWS Lambda

Use Golang for Data Processing With Amazon Kinesis and AWS Lambda

Are you interested in learning how to use Golang and AWS Lambda to build a serverless solution? Learn more in this tutorial.

Abhishek Gupta user avatar by
Abhishek Gupta
CORE ·
Mar. 14, 23 · Tutorial
Like (4)
Save
Tweet
Share
4.33K Views

Join the DZone community and get the full member experience.

Join For Free

This blog post is for folks interested in learning how to use Golang and AWS Lambda to build a serverless solution. You will be using the aws-lambda-go library along with the AWS Go SDK v2 for an application that will process records from an Amazon Kinesis data stream and store them in a DynamoDB table. But that's not all! You will also use Go bindings for AWS CDK to implement "Infrastructure-as-code" for the entire solution and deploy it with the AWS CDK CLI.

Introduction

Amazon Kinesis is a platform for real-time data processing, ingestion, and analysis. Kinesis Data Streams is a serverless streaming data service (part of the Kinesis streaming data platform, along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics) that enables developers to collect, process, and analyze large amounts of data in real-time from various sources such as social media, IoT devices, logs, and more. AWS Lambda, on the other hand, is a serverless compute service that allows developers to run their code without having to manage the underlying infrastructure.

The integration of Amazon Kinesis with AWS Lambda provides an efficient way to process and analyze large data streams in real time. A Kinesis data stream is a set of shards and each shard contains a sequence of data records. A Lambda function can act as a consumer application and process data from a Kinesis data stream. You can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out. For standard iterators, Lambda polls each shard in your Kinesis stream for records using HTTP protocol. The event source mapping shares read throughput with other consumers of the shard.

Amazon Kinesis and AWS Lambda can be used together to build many solutions including real-time analytics (allowing businesses to make informed decisions), log processing (use logs to proactively identify and address issues in server/applications, etc. before they become critical), IoT data processing (analyze device data in real-time and trigger actions based on the results), clickstream analysis (provide insights into user behavior), fraud detection (detect and prevent fraudulent card transactions) and more.

As always, the code is available on GitHub.

Kinesis-Lambda-DynamoDB flow

Prerequisites

Before you proceed, make sure you have the Go programming language (v1.18 or higher) and AWS CDK installed.

Clone the GitHub repository and change to the right directory:

git clone https://github.com/abhirockzz/kinesis-lambda-events-golang

cd kinesis-lambda-events-golang


Use AWS CDK To Deploy the Solution

To start the deployment, simply invoke cdk deploy and wait for a bit. You will see a list of resources that will be created and will need to provide your confirmation to proceed.

cd cdk

cdk deploy

# output

Bundling asset KinesisLambdaGolangStack/kinesis-function/Code/Stage...

✨  Synthesis time: 5.94s

This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:

//.... omitted

Do you wish to deploy these changes (y/n)? y


This will start creating the AWS resources required for our application.

If you want to see the AWS CloudFormation template which will be used behind the scenes, run cdk synth and check the cdk.out folder.

You can keep track of the progress in the terminal or navigate to the AWS console: CloudFormation > Stacks > KinesisLambdaGolangStack. 

Once all the resources are created, you can try out the application. You should have:

  • A Lambda function
  • A Kinesis stream
  • A DynamoDB table
  • Along with a few other components (like IAM roles, etc.)

Verify the Solution

You can check the table and Kinesis stream info in the stack output (in the terminal or the Outputs tab in the AWS CloudFormation console for your Stack):

KinesisLambdaGolangStack

Publish a few messages to the Kinesis stream. For the purposes of this demo, you can use the AWS CLI:

export KINESIS_STREAM=<enter the Kinesis stream name from cloudformation output>

aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"seattle"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new york"}' | base64)


Check the DynamoDB table to confirm that the file metadata has been stored. You can use the AWS console or the AWS CLI aws dynamodb scan --table-name <enter the table name from cloudformation output>.

Check the DynamoDB table: confirm the file metadata has been stored

Don’t Forget To Clean Up

Once you're done, to delete all the services, simply use:

cdk destroy

#output prompt (choose 'y' to continue)

Are you sure you want to delete: KinesisLambdaGolangStack (y/n)?


You were able to set up and try the complete solution. Before we wrap up, let's quickly walk through some of the important parts of the code to get a better understanding of what's going the behind the scenes.

Code Walkthrough

Some of the code (error handling, logging, etc.) has been omitted for brevity since we only want to focus on the important parts.

AWS CDK

You can refer to the CDK code here.

We start by creating the DynamoDB table:

    table := awsdynamodb.NewTable(stack, jsii.String("dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("email"),
                Type: awsdynamodb.AttributeType_STRING},
        })

    table.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)


We create the Lambda function (CDK will take care of building and deploying the function) and make sure we provide it with appropriate permissions to write to the DynamoDB table.

    function := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("kinesis-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{"TABLE_NAME": table.TableName()},
            Entry:       jsii.String(functionDir),
        })

    table.GrantWriteData(function)


Then, we create the Kinesis stream and add that as an event source to the Lambda function.

    kinesisStream := awskinesis.NewStream(stack, jsii.String("lambda-test-stream"), nil)

    function.AddEventSource(awslambdaeventsources.NewKinesisEventSource(kinesisStream, &awslambdaeventsources.KinesisEventSourceProps{
        StartingPosition: awslambda.StartingPosition_LATEST,
    }))


Finally, we export the Kinesis stream and DynamoDB table name as CloudFormation outputs.

    awscdk.NewCfnOutput(stack, jsii.String("kinesis-stream-name"),
        &awscdk.CfnOutputProps{
            ExportName: jsii.String("kinesis-stream-name"),
            Value:      kinesisStream.StreamName()})

    awscdk.NewCfnOutput(stack, jsii.String("dynamodb-table-name"),
        &awscdk.CfnOutputProps{
            ExportName: jsii.String("dynamodb-table-name"),
            Value:      table.TableName()})


Lambda Function

You can refer to the Lambda Function code here.

The Lambda function handler iterates over each record in the Kinesis stream, and for each of them:

  • Unmarshals the JSON payload in the Kinesis stream into a Go struct
  • Stores the stream data partition key as the primary key attribute (email) of the DynamoDB table
  • The rest of the information is picked up from the stream data and also stored in the table.
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {

    for _, record := range kinesisEvent.Records {

        data := record.Kinesis.Data

        var user CreateUserInfo
        err := json.Unmarshal(data, &user)

        item, err := attributevalue.MarshalMap(user)
        if err != nil {
            return err
        }

        item["email"] = &types.AttributeValueMemberS{Value: record.Kinesis.PartitionKey}

        _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
            TableName: aws.String(table),
            Item:      item,
        })
    }

    return nil
}

type CreateUserInfo struct {
    Name string `json:"name"`
    City string `json:"city"`
}


Wrap Up

In this blog, you saw an example of how to use Lambda to process messages in a Kinesis stream and store them in DynamoDB, thanks to the Kinesis and Lamdba integration. The entire infrastructure life-cycle was automated using AWS CDK.

All this was done using the Go programming language, which is well-supported in DynamoDB, AWS Lambda, and AWS CDK.

Happy building!

AWS Lambda Amazon DynamoDB Golang CDK (programming library) Data processing

Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • How to Submit a Post to DZone
  • 11 Observability Tools You Should Know
  • Multi-Cloud Integration

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: