DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Allow Users to Track Fitness Status in Your App
  • Practical Generators in Go 1.23 for Database Pagination
  • Introduction to Salesforce Batch Apex [Video]
  • Using VB.NET To Check for Proxy and VPN With IP2Location.io Geolocation API

Trending

  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 2
  • Agile and Quality Engineering: A Holistic Perspective
  • Why High-Performance AI/ML Is Essential in Modern Cybersecurity
  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  1. DZone
  2. Data Engineering
  3. Data
  4. DynamoDB Go SDK: How To Use the Scan and Batch Operations Efficiently

DynamoDB Go SDK: How To Use the Scan and Batch Operations Efficiently

The DynamoDB Scan API accesses every item in a table (or secondary index). In this article, learn how to use Scan API with the DynamoDB Go SDK.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Dec. 16, 22 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
10.6K Views

Join the DZone community and get the full member experience.

Join For Free

The DynamoDB Scan API accesses every item in a table (or secondary index). It is the equivalent of a select * from query. One of the things I will cover in this blog is how to use Scan API with the DynamoDB Go SDK.

To scan a table, we need some data to begin with! So in the process, I will also go into how to use the Batch API to write bulk data in DynamoDB. You can use the BatchWriteItem API to create or delete items in batches (of twenty-five) and it's possible to you can combine these operations across multiple tables.

We will start simple and gradually improve our approach to using the APIs efficiently. I will also go over some of the basic tests that I ran to demonstrate incremental improvements. Finally, I will wrap up by highlighting some of the considerations while using these operations.

You can refer to the code on GitHub.

Before You Proceed

Make sure to create a DynamoDB table called users with:

  • Partition key email (data type String)
  • On-Demand capacity mode

DynamoDB User table

Also, there are a few things I want to call a few things to set the context:

  • The table was created in us-east-1 and tests were executed from an EC2 instance in us-east-1 as well
  • Since these are general tests instead of specialized benchmarks, I did not do any special tuning (at any level). These are just Go functions that were executed with different inputs, keeping things as simple as possible.
  • The tests include marshaling (converting Go struct to DynamoDB data types) for BatchWriteItem operations and un-marshaling (converting from DynamoDB data types back to Go struct) for Scan operation.

Let's start off by exploring the BatchWriteItem API. This way we will have data to work with the Scan operations as well.

Win-win!

Importing Data in Batches

Since you can combine 25 items in a single invocation, using a batch approach for bulk data imports is much better compared to invoking the PutItem in a loop (or even in parallel).

Here is a basic example of how you would use BatchWriteItem:

func basicBatchImport() {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batch := make(map[string][]types.WriteRequest)
    var requests []types.WriteRequest

    for i := 1; i <= 25; i++ {
        user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
        item, _ := attributevalue.MarshalMap(user)
        requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
    }

    batch[table] = requests

    op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
        RequestItems: batch,
    })
    if err != nil {
        log.Fatal("batch write error", err)
    } else {
        log.Println("batch insert done")
    }

    if len(op.UnprocessedItems) != 0 {
        log.Println("there were", len(op.UnprocessedItems), "unprocessed records")
    }

    log.Println("inserted", (25 - len(op.UnprocessedItems)), "records in", time.Since(startTime).Seconds(), "seconds")
}


With BatchWriteItemInput, we can define the operations we want to perform in the batch - here we are just going to perform PutRequests (which is encapsulated within another type called WriteRequest).

We assemble the WriteRequests in a slice and finally, put them in a map with the key being the table name: this is exactly what the RequestItems attribute in BatchWriteItemInput needs.

In this case, we are dealing with a single table but you could execute operations on multiple tables.

In this example, we just dealt with one batch of 25 records (maximum permitted batch size). If we want to import more records, all we need to do is split them into batches of 25 and execute them one (sub)batch at a time. Simple enough - here is an example:

func basicBatchImport2(total int) {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batchSize := 25
    processed := total

    for num := 1; num <= total; num = num + batchSize {

        batch := make(map[string][]types.WriteRequest)
        var requests []types.WriteRequest

        start := num
        end := num + 24

        for i := start; i <= end; i++ {
            user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
            item, _ := attributevalue.MarshalMap(user)
            requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
        }

        batch[table] = requests

        op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
            RequestItems: batch,
        })

        if err != nil {
            log.Fatal("batch write error", err)
        }

        if len(op.UnprocessedItems) != 0 {
            processed = processed - len(op.UnprocessedItems)
        }
    }

    log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

    if processed != total {
        log.Println("there were", (total - processed), "unprocessed records")
    }
}


I tried this with 50000 records (which means 2000 batches) and it took approximately 15 seconds. But we can do much better!

Parallel Batch Import

Instead of processing each batch sequentially, we can spin up a goroutine for each batch:

func parallelBatchImport(numRecords int) {

    startTime := time.Now()

    cities := []string{"NJ", "NY", "ohio"}
    batchSize := 25

    var wg sync.WaitGroup

    processed := numRecords

    for num := 1; num <= numRecords; num = num + batchSize {
        start := num
        end := num + 24

        wg.Add(1)

        go func(s, e int) {
            defer wg.Done()

            batch := make(map[string][]types.WriteRequest)
            var requests []types.WriteRequest

            for i := s; i <= e; i++ {
                user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}

                item, err := attributevalue.MarshalMap(user)
                if err != nil {
                    log.Fatal("marshal map failed", err)
                }
                requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
            }

            batch[table] = requests

            op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
                RequestItems: batch,
            })

            if err != nil {
                log.Fatal("batch write error", err)
            }

            if len(op.UnprocessedItems) != 0 {
                processed = processed - len(op.UnprocessedItems)
            }

        }(start, end)
    }

    log.Println("waiting for all batches to finish....")
    wg.Wait()

    log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

    if processed != numRecords {
        log.Println("there were", (numRecords - processed), "unprocessed records")
    }
}


The results improved by a good margin. Here is what I got. On average:

  • Inserting 50000 records took ~ 2.5 seconds.
  • Inserted 100000 records in ~ 4.5 to 5 seconds
  • Inserted 150000 records in less than 9.5 seconds
  • Inserted 200000 records in less than 11.5 seconds

There may be unprocessed records in a batch. This example detects these records, but the retry logic has been skipped to keep things simple. Ideally, you should have an (exponential back-off-based) retry mechanism for handling unprocessed records as well.

To insert more data, I ran the parallelBatchImport function (above) in loops. For example:

for i := 1; i <= 100; i++ {
    parallelBatchImport(50000)
}


Alright, let's move ahead. Now that we have some data, let's try...

The Scan API

This is what basic usage looks like:

func scan() {
    startTime := time.Now()

    op, err := client.Scan(context.Background(), &dynamodb.ScanInput{
        TableName:              aws.String(table),
        ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
    })

    if err != nil {
        log.Fatal("scan failed", err)
    }

    for _, i := range op.Items {
        var u User
        err := attributevalue.UnmarshalMap(i, &u)
        if err != nil {
            log.Fatal("unmarshal failed", err)
        }
    }

    if op.LastEvaluatedKey != nil {
        log.Println("all items have not been scanned")
    }
    log.Println("scanned", op.ScannedCount, "items in", time.Since(startTime).Seconds(), "seconds")
    log.Println("consumed capacity", *op.ConsumedCapacity.CapacityUnits)
}


Just provide the table (or secondary index) name and you are good to go! However, there are chances that you might not be able to get all items because of API limits (1 MB worth of data per invocation). In my case, it took about 0.5 secs for approximately 15000 records. The rest of the items were skipped because the 1 MB limit was breached.

Using Pagination

To handle the limitation around data, the Scan API returns LastEvaluatedKey in its output to point to the last processed record. All you need to do is invoke Scan again, with the value for ExclusiveStartKey attribute set to the one for LastEvaluatedKey.

Using a paginated scan approach took me approximately 100 secs to scan ~ 7.5 million records.

Parallel Scan

Pagination helps, but it's still a sequential process. There is a lot of scope for improvement. Thankfully, Scan allows you to adopt a parallelized approach; i.e., you can use multiple workers (goroutines in this case) to process data in parallel!

func parallelScan(pageSize, totalWorkers int) {
    log.Println("parallel scan with page size", pageSize, "and", totalWorkers, "goroutines")
    startTime := time.Now()

    var total int

    var wg sync.WaitGroup
    wg.Add(totalWorkers)

    for i := 0; i < totalWorkers; i++ {
        // start a goroutine for each segment

        go func(segId int) {
            var segTotal int

            defer wg.Done()

            lastEvaluatedKey := make(map[string]types.AttributeValue)

            scip := &dynamodb.ScanInput{
                TableName:     aws.String(table),
                Limit:         aws.Int32(int32(pageSize)),
                Segment:       aws.Int32(int32(segId)),
                TotalSegments: aws.Int32(int32(totalWorkers)),
            }

            for {
                if len(lastEvaluatedKey) != 0 {
                    scip.ExclusiveStartKey = lastEvaluatedKey
                }
                op, err := client.Scan(context.Background(), scip)

                if err != nil {
                    log.Fatal("scan failed", err)
                }

                segTotal = segTotal + int(op.Count)

                for _, i := range op.Items {

                    var u User
                    err := attributevalue.UnmarshalMap(i, &u)
                    if err != nil {
                        log.Fatal("unmarshal failed", err)
                    }
                }

                if len(op.LastEvaluatedKey) == 0 {
                    log.Println("[ segment", segId, "] finished")
                    total = total + segTotal
                    log.Println("total records processsed by segment", segId, "=", segTotal)
                    return
                }

                lastEvaluatedKey = op.LastEvaluatedKey
            }
        }(i)
    }

    log.Println("waiting...")
    wg.Wait()

    log.Println("done...")
    log.Println("scanned", total, "items in", time.Since(startTime).Seconds(), "seconds")
}


Segment and TotalSegments attributes are the key to how Scan API enables parallelism. TotalSegments is nothing but the number of threads/goroutines/worker-processes that need to be spawned and Segment is a unique identifier for each of them.

In my tests, the Scan performance remained (almost) constant at 37-40 seconds (average) for about ~ 7.5 million records (I tried a variety of page sizes and goroutine combinations).

How Many TotalSegments Do I Need to Configure?

To tune the appropriate number of parallel threads/workers, you might need to experiment a bit. A lot might depend on your client environment.

  • Do you have enough compute resources?
  • Some environments/runtimes might have managed thread pools, so you will have to comply with those.

So, you will need to try things out to find the optimum parallelism. One way to think about it could be to choose one segment (single worker/thread/goroutine) per unit of data (say a segment for every GB of data you want to scan).

Wrap-Up: API Considerations

Both Batch and Scan APIs are quite powerful, but there are nuances you should be aware of. My advice is to read the API documentation thoroughly.

With Batch APIs:

  • There are certain limits:
    • No more than 25 requests in a batch
    • Individual items in a batch should not exceed 400KB.
    • The total size of items in a single BatchWriteItem cannot be more than 16MB.
  • BatchWriteItemcannot update items.
  • You cannot specify conditions on individual put and delete requests.
  • It does not return deleted items in the response.
  • If there are failed operations, you can access them via the UnprocessedItems response parameter.

Use Scan Wisely

Since a Scan operation goes over the entire table (or secondary index), it's highly likely that it consumes a large chunk of the provisioned throughput, especially if it's a large table. That being said, Scan should be your last resort. Check whether Query API (or BatchGetItem) works for your use case.

The same applies to parallel Scan.

There are a few ways in which you can further narrow down the results by using a Filter Expression, a Limit parameter (as demonstrated earlier) or a ProjectionExpression to return only a subset of attributes.

That's all for this blog. I hope you found it useful.

Until next time, Happy coding!

API Amazon DynamoDB Software development kit Data (computing) Go (programming language) Record (computer science) Data Types Batch processing AWS Cloud

Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Allow Users to Track Fitness Status in Your App
  • Practical Generators in Go 1.23 for Database Pagination
  • Introduction to Salesforce Batch Apex [Video]
  • Using VB.NET To Check for Proxy and VPN With IP2Location.io Geolocation API

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!