Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Cassandra With Spark 2.0: Building Rest API

DZone's Guide to

Cassandra With Spark 2.0: Building Rest API

In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.

Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

In this tutorial , we will be demonstrating how to make a REST service in Spark using Akka-http as a side-kick and Cassandra as the data store.

We have seen the power of Spark earlier and when it is combined with Cassandra in a right way it becomes even more powerful. Earlier we have seen how to build Rest API on Spark and Couchbase in this blog post, hence this will be about how to do the same thing in Cassandra.

So let's get started with the code:

Your build.sbt should look like this:


name := "cassandra-spark-akka-http-starter-kit"

version := "1.0"

scalaVersion := "2.11.8"

organization := "com.knoldus"

val akkaV = "2.4.5"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "2.0.0",
  "org.apache.spark" % "spark-sql_2.11" % "2.0.0",
  "com.typesafe.akka" %% "akka-http-core" % akkaV,
  "com.typesafe.akka" %% "akka-http-experimental" % akkaV,
  "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
  "org.scalatest" %% "scalatest" % "2.2.6" % "test",
  "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3",
  "net.liftweb" % "lift-json_2.11" % "2.6.2"

)

assembleArtifact in assemblyPackageScala := false // We don't need the Scala library, Spark already includes it

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
  case "reference.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

ivyScala := ivyScala.value map {
  _.copy(overrideScalaVersion = true)
}
fork in run := true

Database Access layer:

And your Database Access layer should look like this:

package com.knoldus.factories

import com.knoldus.domain.User
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector._

import scala.util.Try

/**
  * Created by shivansh on 9/5/16.
  */
trait DatabaseAccess {

  import Context._

  def create(user: User): Boolean =
    Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tableName)).toOption.isDefined

  def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tableName).where(s"id='$id'").collect()).toOption
}

object DatabaseAccess extends DatabaseAccess


object Context {
  val config = ConfigFactory.load()
  val url = config.getString("cassandra.url")
  val sparkConf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-rest-example").setMaster("local[4]")
    .set("spark.cassandra.connection.host", url)
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  val sc = spark.sparkContext
  val keyspace = config.getString("cassandra.keyspace")
  val tableName = config.getString("cassandra.tableName")
}

Service Layer:

Now your routing file should look like this:

package com.knoldus.routes

import java.util.UUID

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.ActorMaterializer
import com.knoldus.domain.User
import com.knoldus.factories.DatabaseAccess
import net.liftweb.json._
import java.util.Date
import net.liftweb.json.Extraction._


trait SparkService extends DatabaseAccess {

  implicit val system:ActorSystem
  implicit val materializer:ActorMaterializer
  val logger = Logging(system, getClass)


  implicit def myExceptionHandler =
    ExceptionHandler {
      case e: ArithmeticException =>
        extractUri { uri =>
          complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
        }
    }

  implicit val formats: Formats = new DefaultFormats {
    outer =>
    override val typeHintFieldName = "type"
    override
    val typeHints = ShortTypeHints(List(classOf[String], classOf[Date]))
  }

  val sparkRoutes: Route = {
    get {
      path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
        complete {
          val documentId = "user::" + UUID.randomUUID().toString
          try {
            val user = User(documentId,name,email)
            val isPersisted = create(user)
            if (isPersisted) {
              HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
            } else {
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
          }
        }
      }
    } ~ path("retrieve" / "id" / Segment) { (listOfIds: String) =>
      get {
        complete {
          try {
            val idAsRDD: Option[Array[User]] = retrieve(listOfIds)
            idAsRDD match {
              case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headOption.fold("")(x => compact(render(decompose(x)))))
              case None => HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not fetched and something went wrong")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for ids : $listOfIds")
          }
        }
      }
    }
  }
}


Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
scala ,akka ,spark ,cassanda ,rest api ,rest api services ,spark 2.0.0 ,connectors

Published at DZone with permission of Shivansh Srivastava, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}