Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Persistent Read-Side Views in Lagom

DZone's Guide to

Persistent Read-Side Views in Lagom

Lagom provides support to build read-side views of persistent data that can be used for querying. Read on to learn how to query Lagom’s microservices to retrieve data.

· Database Zone ·
Free Resource

Databases are better when they can run themselves. CockroachDB is a SQL database that automates scaling and recovery. Check it out here.

In this article, we will be discussing how to query Lagom’s microservices for retrieving data. I hope you are clear with persistent entity concepts in Lagom, but if not, you can get a quick overview by going through this article on data persistence in Lagom.

Lagom handles data persistence by using a persistent entity, which holds the state of individual entities. But to interact with them, one must know the identifier of the entity. Hence, Lagom provides support to build read-side views of the persistent data that can be used for querying purposes.

This separation of the write-side and the read-side of persistent data is often referred to as the CQRS (command query responsibility segregation) pattern.

Read-Side Processor

The read side can be implemented using any database. For now, we will be using Cassandra to understand its concepts.

One thing to keep in mind is that the read side should only be updated in response to events received from persistent entities.

This is done by building a ReadSideProcessor, which will transform the events generated by the persistent entities into database tables that can be queried. And at the same time, it keeps track of which events it has handled by using offsets.

Each event produced by a persistent entity has an offset. When a read-side processor first starts, it loads the offset of the last event that is processed. And whenever an event is processed, it stores its offset.

This is what the ReadSideProcessor class looks like:

class UserProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext)
  extends ReadSideProcessor[UserEvent] {

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[UserEvent] = ???

  override def aggregateTags: Set[AggregateEventTag[UserEvent]] = ???
}

We have injected the Cassandra session and Cassandra read-side support. The above class extends the ReadSideProcessor trait and we need to implement two methods: aggregateTags and buildHandler. But before we discuss them in detail, we first need to understand how we can tag the events.

Event Tags

In order to consume events from a read-side, the events need to be tagged. Events can be tagged by making them implement the AggregateEvent interface. The tag is defined using the aggregateTags method. Here is an example of how you can tag the event:

trait UserEvent extends AggregateEvent[UserEvent] {
  override def aggregateTag: AggregateEventTagger[UserEvent] = UserEvent.Tag
}

object UserEvent {
  val numberOfShards = 4
  val Tag: AggregateEventShards[UserEvent] = AggregateEventTag.sharded[UserEvent](numberOfShards)
}

Now, let’s discuss the two methods that we need to implement to create ReadSideProcessor.

1. aggregateTags

This method returns a list of all the tags that our processor will handle. To do this, we will simply return the list of all the events for our class.

override def aggregateTags: Set[AggregateEventTag[UserEvent]] =
  UserEvent.Tag.allTags

2. buildHandler

This is used to handle events by creating ReadSideProcessor. We have used CassandraReadSide’s builder method to create the handler, which automatically handles readside offsets. The argument passed to this method is the ID of the event processor that Lagom will use when it persists offsets to its offset store.

readSide.builder[UserEvent]("userEventOffset")

We also need to register two callbacks: globalPrepare and prepare.

globalPrepare

This callback issued for creating tables and preparing any data that needs to be available before read-side processing starts. This runs at least once across the whole cluster. We can create tables like:

def createTable(): Future[Done] = {
  session.executeCreateTable(
    """
      |CREATE TABLE IF NOT EXISTS usertable(
      |id text PRIMARY KEY,
      |name text,
      |age int
      |);
    """.stripMargin)
}

After that, we can register the globalPrepare callback in the buildHandler method using setGlobalPrepare.

readSide.setGlobalPrepare(createTable)

Prepare

This callback is executed once per shard when the read-side processor starts up. It can be used for preparing statements in order to optimize Cassandra’s handling of them. An example of creating prepared statements for INSERT queries looks like this:

def createPreparedStatements: Future[Done] = {
  for{
    userPreparedStatement <- session.prepare("INSERT INTO usertable(id, name, age) VALUES (?, ?, ?)")
  } yield{
    userStatement = userPreparedStatement
    Done
  }
}

This callback also needs to be registered inside the buildHandler method using setPrepare.

readSide.setPrepare(_ => createPreparedStatements)

Event Handler

The event handlers are responsible for handling the actual events. They take an event and return a list of bound statements. An example of handling user events looks like this:

def storeUser(user: User): Future[List[BoundStatement]] = {
  val userBindStatement = userStatement.bind()
  userBindStatement.setString("id", user.id)
  userBindStatement.setString("name", user.name)
  userBindStatement.setInt("age", user.age)
  Future.successful(List(userBindStatement))
}

This is then registered with the builder using setEventHandler.

readSide.setEventHandler[UserCreated](e ⇒ storeUser(e.event.user))

Registering Your Read-Side Processor

You need to register the read-side processor with your microservice. This is done inside your application service loader.

readSide.register(wire[UserProcessor])

To query the Cassandra tables, you can create a method and use Cassandra SELECT statements. An example of getting user details for our application is:

def getUserByName(name: String): Future[Option[User]] =
  session.selectOne(s"SELECT * FROM usertable WHERE name = '$name'").map{optRow =>
    optRow.map{row =>
      val id = row.getString("id")
      val name = row.getString("name")
      val age = row.getInt("age")
      User(id, name, age)
   }
  }

The complete demo code is available here. You can check README.md file for instructions to run the application.

References:

This article was originally posted on the Knoldus blog.

Databases should be easy to deploy, easy to use, and easy to scale. If you agree, you should check out CockroachDB, a scalable SQL database built for businesses of every size. Check it out here. 

Topics:
database ,lagom ,tutorial ,persistence ,event tags ,data processing ,event handling ,data retrieval ,microservices ,cassandra

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}