Exposing ElasticSearch Index Using Scala Akka HTTP
Akka HTTP is a convenient and powerful routing DSL for expressing web services that can be used with Scala to integrate a service with ElasticSearch.
Join the DZone community and get the full member experience.
Join For FreeThe following article demonstrates on how we can use Akka HTTP with Scala to integrate a service with ElasticSearch.
The example used to demonstrate this is a user service that exposes a REST endpoint with the URI of /users
.
The integration from Akka to ElasticSearch is done using the Elastic4s library.
Implementation
The following classes are used to represent the user model.
sealed trait User
case class UserDetail(_id: String, firstName : String,
lastName: String,
daily_rate: Int,
employee_type : String,
age : Int) extends User
case class UserList(users : List[UserDetail])
An Akka HTTP route can be created in the following way:
trait UserRoute {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer : ActorMaterializer
implicit val client : TcpClient
implicit def config: Config
val logger: LoggingAdapter
val route : Route = {
logRequestResult("user-service") {
path("users") {
get {
complete {
UserSvc.getUsers(client)
.map[ToResponseMarshallable] {
case Right(eventList) => OK -> eventList.toJson
case Left(ex) => {
logger.error(ex.getMessage, ex)
StatusCodes.InternalServerError
}
}
}
}
}
}
}
}
The UserService
(UserSvc
above) that accesses ElasticSearch to retrieve the list of users can be done in the following way. It queries elastic search for all employees that are contractors
and sorts the results by age
.
object UserSvc {
def getUsers(client: TcpClient)
(implicit system: ActorSystem, materializer: ActorMaterializer,
config: Config): Future[Either[Throwable, UserList]] = {
val response = client.execute {
search(config.getString("elasticSearch.index_name")) limit config.getInt("service.max_list_record_count") query {
boolQuery()
.must(matchQuery("employee_type", "contractor"))
} sortBy (
fieldSort("age")
)
}
val responseFuture = response
.map(sr => {
val response = sr.safeTo[UserDetail]
val anyException = response.find(_.isLeft)
if (anyException.isDefined) {
Left(new RuntimeException(anyException.get.left.get))
} else {
val userList = response.map(_.right.get).toList
Right(UserList(userList))
}
})
responseFuture recover {
case cause => Future(Left(cause))
}
responseFuture
}
Test
Akka HTTP provides a comprehensive test kit that has very good support for test routes. The test can be written in the following manner. Only a single test has been written to explain this example.
class UserSpec extends FlatSpec
with ScalatestRouteTest
with UserRoute
with ElasticSugar
with ElasticMatchers {
val elasticTestingPort = node.ipAndPort.dropWhile(_ != ':').drop(1).toInt
override val logger = NoLogging
override val config = ConfigFactory.load("application_test.conf")
.withValue("elasticSearch.port", ConfigValueFactory.fromAnyRef(elasticTestingPort))
val index_name = "user_index"
val type_name = "usertest"
client.execute {
bulk(
indexInto(index_name, type_name)
.fields("firstName" -> "Foo",
"lastName" -> "Bar",
"employee_type" -> "contractor",
"daily_rate" -> 500,
"age" -> 90)
id "193",
indexInto(index_name, type_name)
.fields("firstName" -> "Jacob",
"lastName" -> "duck",
"employee_type" -> "permanent",
"daily_rate" -> 0,
"age" -> 53)
id "147",
indexInto(index_name, type_name)
.fields("firstName" -> "Usher",
"lastName" -> "Bay",
"employee_type" -> "contractor",
"daily_rate" -> 610,
"age" -> 19)
id "52"
)
}.await
blockUntilCount(3, index_name, type_name)
it should "Retrieve list of users" in {
Get(s"/users") ~> route ~> check {
status shouldBe OK
contentType shouldBe ContentTypes.`application/json`
val responseString = responseAs[String]
val userList = JsonParser(responseString).convertTo[UserList]
userList.users.size shouldBe 2
userList.users(0).firstName shouldBe "Usher"
userList.users(1).firstName shouldBe "Foo"
}
}
}
Below is the configuration file used for this example:
akka {
loglevel = DEBUG
http {
client {
idle-timeout = 200 ms
}
}
}
http {
interface = "0.0.0.0"
port = 9000
}
elasticSearch {
host = "localhost"
port = "8080"
uri = "/_cluster/health"
index_name = "user_index"
}
service {
max_list_record_count = 20
}
References
Opinions expressed by DZone contributors are their own.
Trending
-
Auto-Scaling Kinesis Data Streams Applications on Kubernetes
-
Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
-
Micro Frontends on Monorepo With Remote State Management
-
How Agile Works at Tesla [Video]
Comments