Over a million developers have joined DZone.

Cassandra With Spark 2.0: Building Rest API

In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.

Learn how you can maximize big data in the cloud with Apache Hadoop. Download this eBook now. Brought to you in partnership with Hortonworks.

In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.

We have seen the power of Spark earlier and when it is combined with Cassandra in a right way it becomes even more powerful. Earlier we have seen how to build Rest API on Spark and Couchbase in this blog post, hence this will be about how to do the same thing in Cassandra.

So let's get started with the code:

Your build.sbt should look like this:


name := "cassandra-spark-akka-http-starter-kit"

version := "1.0"

scalaVersion := "2.11.8"

organization := "com.knoldus"

val akkaV = "2.4.5"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "2.0.0",
  "org.apache.spark" % "spark-sql_2.11" % "2.0.0",
  "com.typesafe.akka" %% "akka-http-core" % akkaV,
  "com.typesafe.akka" %% "akka-http-experimental" % akkaV,
  "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
  "org.scalatest" %% "scalatest" % "2.2.6" % "test",
  "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3",
  "net.liftweb" % "lift-json_2.11" % "2.6.2"

)

assembleArtifact in assemblyPackageScala := false // We don't need the Scala library, Spark already includes it

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
  case "reference.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

ivyScala := ivyScala.value map {
  _.copy(overrideScalaVersion = true)
}
fork in run := true

Database Access layer:

And your Database Access layer should look like this:

package com.knoldus.factories

import com.knoldus.domain.User
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector._

import scala.util.Try

/**
  * Created by shivansh on 9/5/16.
  */
trait DatabaseAccess {

  import Context._

  def create(user: User): Boolean =
    Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined

  def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption
}

object DatabaseAccess extends DatabaseAccess


object Context {
  val config = ConfigFactory.load()
  val url = config.getString("cassandra.url")
  val sparkConf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-rest-example").setMaster("local[4]")
    .set("spark.cassandra.connection.host", url)
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  val sc = spark.sparkContext
  val keyspace = config.getString("cassandra.keyspace")
  val tableName = config.getString("cassandra.tableName")
}

Service Layer:

Now your routing file should look like this:

package com.knoldus.routes

import java.util.UUID

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.ActorMaterializer
import com.knoldus.domain.User
import com.knoldus.factories.DatabaseAccess
import net.liftweb.json._
import java.util.Date
import net.liftweb.json.Extraction._


trait SparkService extends DatabaseAccess {

  implicit val system:ActorSystem
  implicit val materializer:ActorMaterializer
  val logger = Logging(system, getClass)


  implicit def myExceptionHandler =
    ExceptionHandler {
      case e: ArithmeticException =>
        extractUri { uri =>
          complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
        }
    }

  implicit val formats: Formats = new DefaultFormats {
    outer =>
    override val typeHintFieldName = "type"
    override
    val typeHints = ShortTypeHints(List(classOf[String], classOf[Date]))
  }

  val sparkRoutes: Route = {
    get {
      path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
        complete {
          val documentId = "user::" + UUID.randomUUID().toString
          try {
            val user = User(documentId,name,email)
            val isPersisted = create(user)
            if (isPersisted) {
              HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
            } else {
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
          }
        }
      }
    } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) =>
      get {
        complete {
          try {
            val idAsRDD: Option[Array[User]] = retrieve(listOfIds)
            idAsRDD match {
              case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x)))))
              case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")
          }
        }
      }
    }
  }
}


Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

Topics:
scala ,akka ,spark ,cassanda ,rest api ,rest api services ,spark 2.0.0 ,connectors

Published at DZone with permission of Shivansh Srivastava, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}