Introducing Hazelcast Scala API With Aggregations

DZone 's Guide to

Introducing Hazelcast Scala API With Aggregations

There's a neat new pre-release alpha of the Hazelcast Scala API on Github. Here's a look at key features, configuration, and more of the Hazelcast Scapa API pre-release Alpha.

· Integration Zone ·
Free Resource

A pre-release alpha version of the Scala API for Hazelcast has been released and is available on Github.

It’s a “soft” API, i.e. it expands the Java API rather than replace it. The Scala API also adds built-in distributed aggregations and IMap join capability.

The Scala API targets version 3.6 of Hazelcast, which is currently available in EA, with a final release due early Q1 2016.

Key Features

The following is a breakdown of some of the key features of the Scala API



Given a configuration instance, Config or ClientConfig, you can set property values in a type-safe and unit agnostic manner.

E.g. in Java, to set the maximum wait time for graceful shutdown, you’d have to know what unit is expected. This is not necessarily hard, but does require either remembering or looking into the documentation or code:

conf.setProperty("hazelcast.graceful.shutdown.max.wait", "300") // <- seconds

In Scala, using import concurrent.duration._, you provide the unit instead:


Likewise for setting socket receive buffer size, in Java:

conf.setProperty("hazelcast.socket.receive.buffer.size", "64") // <- kilobytes

And Scala:



The Scala API provides a number of serializers for commonly used Scala classes, which can be registered for better performance:

import com.hazelcast.Scala._


In an earlier blog post, I described how a Java enum can be used as a container for the serialization type classes ByteArraySerializer and StreamSerializer. The Java implementation requires some boilerplate that we can avoid in Scala and the same pattern can be provided as a default implementation. Given a Person class:

package foo.domain

import java.time.LocalDate

case class Person(name: String, birthday: LocalDate)

And serializer:

package foo.serialization

import com.hazelcast.Scala.serialization._

object MySerializers extends SerializerEnum {

  val PersonSer = new StreamSerializer[Person] {
    def write(out: ObjectDataOutput, person: Person): Unit = {
    def read(inp: ObjectDataInput): Person = {
      val name = inp.readUTF()
      val year = inp.readShort()
      val month = inp.readByte()
      val day = inp.readByte()
      Person(name, LocalDate.of(year, month, day));    


We can chain serializer containers (SerializerEnum objects), to logically separate serializers, while maintaining serializer id sequencing, by indirect extension:

object OtherSerializers extends SerializerEnum(MySerializers) {
  // More serializers...

This ensures that all serializers have a unique, monotonically increasing, serializer id.

New instance

In Scala, you get the HazelcastInstance directly from the configuration object:

val member = conf.newInstance()
// or ClientConfig:
val client = clientConf.newClient()

Non-Blocking Operations

The Scala API puts all non-blocking operations into a separate interface, which is available by calling .async. All methods return scala.concurrent.Futures and are adaptations of the async methods in the Java API, as well as some convenience methods for performing in-place updates, e.g. for IMap these are available:

  1. def update(key: K)(updateIfPresent: V => V): Future[Boolean]
  2. def updateAndGet(key: K)(updateIfPresent: V => V): Future[Option[V]]
  3. def upsert(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[UpsertResult]
  4. def upsertAndGet(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[V]

This makes it syntactically easy to e.g. implement the equivalent of an AtomicDouble:

val doubles = hz.getMap[String, Double]("atomic-doubles")
val delta = 1.3333
// Insert value if "gauge" unknown, otherwise add value:
val gaugeValue: Future[Double] = doubles.upsertAndGet("gauge", delta)(_ + delta)
// Print updated value:

Distributed Computing

The Scala API for Hazelcast makes it easy to do distributed compute queries and aggregations, using the familiar syntax from the Scala collections.

It’s currently only implemented for IMap, but may be provided for MultiMap and List/Set in the future.


Given this trait:

import java.time._
import concurrent.Future

trait Stats {
   * Calculate the mean max temperature, per month, for 
   * a given year.
   * @param forYear The year
  def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]]

Let’s say we’d like to print the mean daily max by month, for Milan of previous year:

  val milanStats: Stats = ???

  val lastYearMonthlyMeanMax = milanStats.monthlyMaxMeanTemp(Year.now minusYears 1)

  lastYearMonthlyMeanMax.foreach { byMonth =>
    byMonth.toSeq.sortBy(_._1).foreach {
      case (month, meanMax) => println(s"$month: Mean max: $meanMax C")

We can store weather stats in one or more Hazelcast IMaps, using this class to hold the values:

case class WeatherStats(
  minTempC: Float,
  maxTempC: Float,
  rainMm: Float,
  dewPointC: Float,
  humidity: Float

If each region holds daily weather stats, we can provide a Hazelcast implementation of Stats, like this:

class HzStats(hz: HazelcastInstance, region: String)(implicit ec: ExecutionContext)
    extends Stats {

  private val weather = hz.getMap[LocalDate, WeatherStats](region)

  def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]] = {
    val begin = forYear.atMonth(Month.JANUARY).atDay(1)
    val end = forYear.atMonth(Month.DECEMBER).atEndOfMonth
    val weatherForYear = weather.filter(where.key.between(begin, end))
    val maxTempsByMonth = weatherForYear.groupBy(
      e => Month.from(e.key), // Group key: Month
      _.value.maxTempC // Group values: maxTempC
    val meanMaxTempByMonth = maxTempsByMonth.mean() // Get the average per group


We can then instantiate milanStats

  val hz: HazelcastInstance = ???
  val milanStats: Stats = new HzStats(hz, "milan")

The mean calculation one of many built-in aggregations provided by hazelcast-scala. More will presumably be added in the future and submissions are welcome, provided they are of general use.

Built-in Aggregations

The following aggregations are currently built in:

All types:

  1. distinct()
  2. distribution()
  3. count()
  4. mode()

Ordering types:

  1. max()
  2. min()
  3. minMax()
  4. medianValues()

Numeric types:

  1. sum()
  2. product()
  3. mean()
  4. range()
  5. median()
  6. variance()

Custom Aggregations

Custom aggregations can be provided by implementing the Aggregator trait and calling .submit() on the DDS, the Distributed Data Set, which is provided by an implicit conversion from IMap.


Aggregations are executed locally on every node that holds data within the provided filter. Since Hazelcast holds data in memory, this allows for very fast and memory efficient aggregations. Informal testing reveals a performance increase of 5-15x over the existing Map/Reduce framework.

What Else

For more complete and up-to-date examples, visit the Github Wiki.


Published at DZone with permission of Nils Kilden-Pedersen , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}