Cassandra With Spark 2.0: Building Rest API
In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.
Join the DZone community and get the full member experience.
Join For FreeIn this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.
We have seen the power of Spark earlier and when it is combined with Cassandra in a right way it becomes even more powerful. Earlier we have seen how to build Rest API on Spark and Couchbase in this blog post, hence this will be about how to do the same thing in Cassandra.
So let's get started with the code:
Your build.sbt should look like this:
name := "cassandra-spark-akka-http-starter-kit"
version := "1.0"
scalaVersion := "2.11.8"
organization := "com.knoldus"
val akkaV = "2.4.5"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.0",
"org.apache.spark" % "spark-sql_2.11" % "2.0.0",
"com.typesafe.akka" %% "akka-http-core" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3",
"net.liftweb" % "lift-json_2.11" % "2.6.2"
)
assembleArtifact in assemblyPackageScala := false // We don't need the Scala library, Spark already includes it
assemblyMergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
ivyScala := ivyScala.value map {
_.copy(overrideScalaVersion = true)
}
fork in run := true
Database Access layer:
And your Database Access layer should look like this:
package com.knoldus.factories
import com.knoldus.domain.User
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector._
import scala.util.Try
/**
* Created by shivansh on 9/5/16.
*/
trait DatabaseAccess {
import Context._
def create(user: User): Boolean =
Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined
def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption
}
object DatabaseAccess extends DatabaseAccess
object Context {
val config = ConfigFactory.load()
val url = config.getString("cassandra.url")
val sparkConf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-rest-example").setMaster("local[4]")
.set("spark.cassandra.connection.host", url)
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val keyspace = config.getString("cassandra.keyspace")
val tableName = config.getString("cassandra.tableName")
}
Service Layer:
Now your routing file should look like this:
package com.knoldus.routes
import java.util.UUID
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.ActorMaterializer
import com.knoldus.domain.User
import com.knoldus.factories.DatabaseAccess
import net.liftweb.json._
import java.util.Date
import net.liftweb.json.Extraction._
trait SparkService extends DatabaseAccess {
implicit val system:ActorSystem
implicit val materializer:ActorMaterializer
val logger = Logging(system, getClass)
implicit def myExceptionHandler =
ExceptionHandler {
case e: ArithmeticException =>
extractUri { uri =>
complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
}
}
implicit val formats: Formats = new DefaultFormats {
outer =>
override val typeHintFieldName = "type"
override
val typeHints = ShortTypeHints(List(classOf[String], classOf[Date]))
}
val sparkRoutes: Route = {
get {
path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
complete {
val documentId = "user::" + UUID.randomUUID().toString
try {
val user = User(documentId,name,email)
val isPersisted = create(user)
if (isPersisted) {
HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
} else {
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
}
}
} ~ path("retrieve" / "id" / Segment) { (listOfIds: String) =>
get {
complete {
try {
val idAsRDD: Option[Array[User]] = retrieve(listOfIds)
idAsRDD match {
case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x)))))
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")
}
}
}
}
}
}
Published at DZone with permission of Shivansh Srivastava, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments