Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Exposing ElasticSearch Index Using Scala Akka HTTP

DZone's Guide to

Exposing ElasticSearch Index Using Scala Akka HTTP

Akka HTTP is a convenient and powerful routing DSL for expressing web services that can be used with Scala to integrate a service with ElasticSearch.

· Integration Zone
Free Resource

Learn how API management supports better integration in Achieving Enterprise Agility with Microservices and API Management, brought to you in partnership with 3scale

The following article demonstrates on how we can use Akka HTTP with Scala to integrate a service with ElasticSearch.

The example used to demonstrate this is a user service that exposes a REST endpoint with the URI of /users .

The integration from Akka to ElasticSearch is done using the Elastic4s library. 

Implementation

The following classes are used to represent the user model.

sealed trait User
case class UserDetail(_id: String,  firstName : String,
                                    lastName: String,
                                    daily_rate: Int,
                                    employee_type : String,
                                    age : Int) extends User
case class UserList(users : List[UserDetail])

An Akka HTTP route can be created in the following way:

trait UserRoute {
  implicit val system: ActorSystem
  implicit def executor: ExecutionContextExecutor
  implicit val materializer : ActorMaterializer

  implicit val client : TcpClient
  implicit def config: Config
  val logger: LoggingAdapter

  val route : Route = {
    logRequestResult("user-service") {
      path("users") {
        get {
          complete {
            UserSvc.getUsers(client)
              .map[ToResponseMarshallable] {
              case Right(eventList) => OK -> eventList.toJson
              case Left(ex) => {
                logger.error(ex.getMessage, ex)
                StatusCodes.InternalServerError
              }
            }
          }
        }
      }
    }
  }
}

The UserService (UserSvc above) that accesses ElasticSearch to retrieve the list of users can be done in the following way. It queries elastic search for all employees that are contractors and sorts the results by age.

object UserSvc {
  def getUsers(client: TcpClient)
              (implicit system: ActorSystem, materializer: ActorMaterializer,
                config: Config): Future[Either[Throwable, UserList]] = {

    val response = client.execute {
      search(config.getString("elasticSearch.index_name")) limit config.getInt("service.max_list_record_count") query {
        boolQuery()
          .must(matchQuery("employee_type", "contractor"))
      } sortBy (
        fieldSort("age")
        )
    }

    val responseFuture = response
      .map(sr => {
        val response = sr.safeTo[UserDetail]
        val anyException = response.find(_.isLeft)

        if (anyException.isDefined) {
          Left(new RuntimeException(anyException.get.left.get))
        } else {
          val userList = response.map(_.right.get).toList
          Right(UserList(userList))
        }
      })

    responseFuture recover {
      case cause => Future(Left(cause))
    }

    responseFuture
  }

Test

Akka HTTP provides a comprehensive test kit that has very good support for test routes. The test can be written in the following manner. Only a single test has been written to explain this example. 

class UserSpec extends FlatSpec
     with ScalatestRouteTest
     with UserRoute
     with ElasticSugar
     with ElasticMatchers {

     val elasticTestingPort = node.ipAndPort.dropWhile(_ != ':').drop(1).toInt
     override val logger = NoLogging
     override val config = ConfigFactory.load("application_test.conf")
       .withValue("elasticSearch.port", ConfigValueFactory.fromAnyRef(elasticTestingPort))
     val index_name = "user_index"
     val type_name = "usertest"

     client.execute {
       bulk(
         indexInto(index_name, type_name)
             .fields("firstName" -> "Foo",
                   "lastName" -> "Bar",
                   "employee_type" -> "contractor",
                   "daily_rate" -> 500,
                   "age" -> 90)
           id "193",
         indexInto(index_name, type_name)
             .fields("firstName" -> "Jacob",
                   "lastName" -> "duck",
                   "employee_type" -> "permanent",
                   "daily_rate" -> 0,
                  "age" -> 53)
           id "147",
         indexInto(index_name, type_name)
             .fields("firstName" -> "Usher",
           "lastName" -> "Bay",
           "employee_type" -> "contractor",
           "daily_rate" -> 610,
           "age" -> 19)
           id "52"
       )
     }.await

     blockUntilCount(3, index_name, type_name)

     it should "Retrieve list of users" in {
       Get(s"/users") ~> route ~> check {
         status shouldBe OK
         contentType shouldBe ContentTypes.`application/json`
         val responseString = responseAs[String]
         val userList = JsonParser(responseString).convertTo[UserList]
         userList.users.size shouldBe 2
         userList.users(0).firstName shouldBe "Usher"
         userList.users(1).firstName shouldBe "Foo"
       }
     }
   }

Below is the configuration file used for this example:

akka {
  loglevel = DEBUG
  http {
    client {
      idle-timeout = 200 ms
    }
  }
}

http {
  interface = "0.0.0.0"
  port = 9000
}

elasticSearch {
  host = "localhost"
  port = "8080"
  uri = "/_cluster/health"
  index_name = "user_index"
}

service {
  max_list_record_count = 20
}

References

Unleash the power of your APIs with future-proof API management - Create your account and start your free trial today, brought to you in partnership with 3scale.

Topics:
elastic search ,akka ,scala ,integration

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}