Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

The Rise of Scanamo: Async Access for DynamoDB in Scala

DZone's Guide to

The Rise of Scanamo: Async Access for DynamoDB in Scala

Let's take a look at how to use Scanamo, which is a library to use DynamoDB with Scala in a simpler manner.

· Database Zone ·
Free Resource

Built by the engineers behind Netezza and the technology behind Amazon Redshift, AnzoGraph is a native, Massively Parallel Processing (MPP) distributed Graph OLAP (GOLAP) database that executes queries more than 100x faster than other vendors.  

Scanamo is a library to use DynamoDB with Scala in a simpler manner with less error-prone code.

Now, the question is  “Why should anyone use it?” The answer is very simple. As DynamoDB clients provided by AWS are not available in Scala DSL, there are a number of libraries available for DynamoDB to write your queries in Scala. What makes Scanamo different from other available libraries? As there are two clients provided by the AWS of Java SDK that is described as below:

  • AmazonDynamoDB: This is a blocking client supported by Scanamo, which works in a synchronous manner.
  • AmazonDynamoDBAsync: Scanamo also supports making the requests asynchronously using this client that implements the async interface.

Obviously, using an AmazonDynamoDBAsync client is a better option, but it’s still not truly async as it relies on Java Futures, which block as soon as you try to access the value within them. Underneath the hood, they make use of a thread pool to perform a blocking call when making the HTTP request to DynamoDB. There is a possibility that you may not be able to reach your provisioned throughput because you have exhausted the thread pool to make HTTP requests.

Due to this problem, the real use of Scanamo comes into the picture as it provides another DynamoDB client, named ScanamoAlpakka, which is purely async in nature. It works on the JVM in the form of an Akka-stream connector, part of the Alpakka project supported by Lightbend.

In order to use the Alpakka connector, you need to import the scanamo-alpakka library dependency.

val scanamoV = "<latest scanamo version>"
libraryDependencies += "com.gu" %% "scanamo-alpakka" % scanamoV

Even to create a DynamoDB client using Alpakka is quite a simple process. These are the  steps that we need to follow:

  • Before you can construct the client, you need an ActorSystem, ActorMaterializer, and ExecutionContext.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
  • You can then create the client with a settings object.
val settings = DynamoSettings(system)
val client = DynamoClient(settings)

That’s all you need to create the dynamo client using alpakka. Apart from this,  you also need to define these configurations in your application.conf.

akka.stream.alpakka.dynamodb {
    region = "eu-west-1"
    host = "localhost"
    port: 8000
    parallelism = 2
  }

To use AWS credentials for dynamoDB, you have to export the below variables in your environment.

export AWS_ACCESS_KEY_ID=your_access_key_id          // has to be provided
export AWS_SECRET_ACCESS_KEY=your_secret_access_key  // has to be provided

Now, the only thing left is to execute our query using Alpakka connector. So, Scanamo supports all the types of operations for DynamoDB.

  • Basic CRUD
  • Batch Operations
  • Conditionals Operations
  • Filters (not recommended)
  • Using Indexes

Here are some basic examples for different operations in DynamoDB using Alpakka.

/**
      * CRUD
      **/

    def put(employee : Employee) : Future[Option[Either[DynamoReadError, Employee]]] =
        ScanamoAlpakka.put[Employee](client)(TableName)(employee)

    def get(name : String, id : Long) : Future[Option[Either[DynamoReadError, Employee]]] =
        ScanamoAlpakka.get[Employee](client)(TableName)('name -> name and 'id -> id)

    def delete(name : String, id : Long) : Future[DeleteItemResult] =
        ScanamoAlpakka.delete[Employee](client)(TableName)('name -> name and 'id -> id)

    def scan : Future[List[Either[DynamoReadError, Employee]]] = //also there is method called scan with limits
        ScanamoAlpakka.scan[Employee](client)(TableName)

    def query(name : String) : Future[List[Either[DynamoReadError, Employee]]] =
        ScanamoAlpakka.query[Employee](client)(TableName)('name -> name and 'id > 0)

    def update(employee : Employee) : Future[Either[DynamoReadError, Employee]] =
        ScanamoAlpakka.update[Employee](client)(TableName)(
            'name -> employee.name and 'id -> employee.id,
            set('code -> employee.code)
        )

    /**
      * Batch Operations*/

    def putAll(employees : Set[Employee]) : Future[List[BatchWriteItemResult]] =
        ScanamoAlpakka.putAll[Employee](client)(TableName)(employees)

    def getAll(names : Set[String]) : Future[Set[Either[DynamoReadError, Employee]]] =
        ScanamoAlpakka.getAll[Employee](client)(TableName)('name -> names)

    def deleteAll(names : Set[String]) : Future[List[BatchWriteItemResult]] =
        ScanamoAlpakka.deleteAll(client)(TableName)('name -> names)

    /**
      * Conditional*/

    def putIfNotExist(employee : Employee) : Future[Either[ConditionalCheckFailedException, PutItemResult]] = {
        ScanamoAlpakka.exec(client)(table.given(not(attributeExists('name)))
            .put(employee))
    }

    def deleteIfExist(employee : Employee) : Future[Either[ConditionalCheckFailedException, DeleteItemResult]] = {
        ScanamoAlpakka.exec(client)(table.given('id > 0)
            .delete('name -> employee.name))
    }

    def updateIfExist(employee : Employee) : Future[Either[ScanamoError, Employee]] = {
        ScanamoAlpakka.exec(client)(table.given('id > 0)
            .update('name -> employee.name,
                set('code -> employee.code)))
    }

    /**
      * Filters*/

    def scanWithId(id : Long) : Future[List[Either[DynamoReadError, Employee]]] = {
        ScanamoAlpakka.exec(client)(table
            .filter('id -> id)
            .scan())
    }

    def queryWithId(code : String) : Future[List[Either[DynamoReadError, Employee]]] = {
        ScanamoAlpakka.exec(client)(table
            .filter('code -> Code(List(code)))
            .query('name -> "name"))
}

To see the full implementation of the above code, you can go through this GitHub repo.

References:

Download AnzoGraph now and find out for yourself why it is acknowledged as the most complete all-in-one data warehouse for BI style and graph analytics.  

Topics:
scala ,nosql ,dynamodb ,async ,database ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}