Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Data Persistence in Lagom

DZone's Guide to

Data Persistence in Lagom

Are you finding it difficult to understand Lagom persistence? Are the concepts of PersistentEntity and initialState driving you insane? We're here to help.

· Database Zone
Free Resource

Read why times series is the fastest growing database category.

In this blog, we will learn about Lagom persistence with the help of a simple application. We'll also discuss its theoretical aspects.

Before we begin, make sure you know about event sourcing and CQRS. You can read about them in more detail here.

Choosing a Database

When we create any microservice, or any service in general, one of the biggest tasks is to manage data persistence. Lagom supports various databases for doing this task. By default, Lagom uses Cassandra to persist data. Tables that are required to store data are saved in Cassandra keyspaces.
So, for now, we will be using Cassandra for storing our data. Our service basically creates a user on request and stores the corresponding details in the database.

To use Cassandra, you need to add the following to your project's build.sbt:

libraryDependencies += lagomScaladslPersistenceCassandra

Lagom requires keyspace configuration for three internal components: journal, snapshot, and offset.

Journals store serialized events, snapshots are automatically saved after a configured number of persisted events for faster recovery, and the offset store provides read-side support.

Each microservice should have a unique keyspace name so that the tables of different services do not conflict with each other. However, you can use same keyspace for all of these components within one service.

To configure keyspace names, you need to add the following in your service implementations' application.conf file:

play.application.loader = com.knoldus.user.impl.service.UserServiceLoader

user.cassandra.keyspace = userdatabase

cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}

By default, the Cassandra server is started on port 4000. However, You can disable the embedded Cassandra server by adding the following in your build.sbt and you can use the external Cassandra running on your localhost.

lagomCassandraEnabled in ThisBuild := false
lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "http://localhost:9042")

Persistent Entity

Lagom's persistence can be handled by defining PersistentEntity. Each instance of the entity has a stable entity identifier through which it can be accessed from service implementation or anywhere in the cluster. It's run by an actor, and the state is durable using event sourcing.
To use Lagom persistence, you need to define the PersistentEntity abstract class and override the abstract type members and method.

class UserEntity extends PersistentEntity

Three absrtact type members — CommandEvent, and State — must be defined by the subclass.

override type Command = UserCommand[_]
override type Event = UserEvent
override type State = UserState

Command

You can interact with PersistentEntity by sending command messages to it. Commands are instructions to do something, like create a user account, fetch user details, etc. Each command must implement the PersistentEntity.ReplyType interface to define the reply type. Here is an example of how you need to define the commands for your application.

trait UserCommand[R] extends ReplyType[R]

case class CreateUserCommand(user: User) extends UserCommand[Done]

Event

A command may cause changes to the entity state, and those changes are stored as events. Events are the immutable facts of things that have happened, like an account being created or updated. An example of defining an event is given below.

sealed trait UserEvent extends AggregateEvent[UserEvent] {
  override def aggregateTag: AggregateEventTagger[UserEvent] = UserEvent.Tag
}
object UserEvent {
  val Tag: AggregateEventTag[UserEvent] = AggregateEventTag[UserEvent]
}
case class UserCreated(user: User) extends UserEvent

State

State is the condition that the entity is in at a specific instance. Events are replayed to recreate the current state of an entity. Below is an example to define the state. You can modify it according to your requirements.

case class UserState(user: Option[User], timeStamp: String)

initialState

Your entity class should also implement abstract method initialState, which defines the state of the entity when it is created for the first time.

override def initialState = UserState(None, LocalDateTime.now().toString)

Behavior

Another method that your concrete subclass should implement is behavior. The behavior is defined as a set of actions or functions.

override def behavior: (UserState) => Actions

To process commands, command handlers are registered using the action onCommand. A command handler is a partial function with three parameters: command, command context (ctx), and current state. A command handler returns a Persist directive that defines what event(s), if any, to persist. thenPersist, thenPersistAll, or done methods are used to create the Persist directive.

.onCommand[CreateUserCommand, Done] {
  case (CreateUserCommand(user), ctx, _) ⇒
    ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
}

PersistentEntity may also process commands that do not change application state, such as query commands. Such command handlers are registered using onReadOnlyCommand.

.onReadOnlyCommand[GetUserCommand, User] {
   case (GetUserCommand(id), ctx, state) =>
    ctx.reply(state.user.getOrElse(User(id, "not found")))
}

Event handlers are used both for persisting and replaying events. These are registered with the onEvent method of the actions. When an event has been persisted successfully, the current state is updated.

.onEvent {
  case (UserCreated(user), _) ⇒
    UserState(Some(user), LocalDateTime.now().toString)
}

A reply is sent with the ctx.reply method. The reply message type must match the ReplyType defined by the command. It will be an acknowledgment that the entity has processed the command successfully.

You can use ctx.invalidCommand to reject an invalid command, which will fail the feature with PersistentEntity.InvalidCommandException on the sender side.

Here is the complete snapshot of the UserEntity class:

class UserEntity extends PersistentEntity {

  override type Command = UserCommand[_]
  override type Event = UserEvent
  override type State = UserState

  override def initialState = UserState(None, LocalDateTime.now().toString)

  override def behavior: (UserState) => Actions = {
    case UserState(_, _) => Actions()
      .onCommand[CreateUserCommand, Done] {
      case (CreateUserCommand(user), ctx, _) ⇒
        ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
    }
      .onReadOnlyCommand[GetUserCommand, User] {
       case (GetUserCommand(id), ctx, state) =>
        ctx.reply(state.user.getOrElse(User(id, "not found")))
    }
      .onEvent {
        case (UserCreated(user), _) ⇒
          UserState(Some(user), LocalDateTime.now().toString)
      }
  }

}

Finally, to access the entity, you need to inject the PersistentEntityRegistry into your service implementation class.

class UserServiceImpl(persistentEntityRegistry: PersistentEntityRegistry)

Also, you need to register the persistent entity and JSON serializer registry in your application loader:

//Register the JSON serializer registry override lazy val jsonSerializerRegistry = UserSerializerRegistry// Register the lagom-persistent-entity-demo persistent entity persistentEntityRegistry.register(wire[UserEntity])

In the service method, you can retrieve a PersistentEntityRef for a given entity identifier from the registry. In the user application, a separate method for retrieving the ref is being created.

def ref(id: String): PersistentEntityRef[UserCommand[_]] = {
  persistentEntityRegistry.refFor[UserEntity](id)
}

Then you can send the command to the entity using the ask method of the PersistentEntityRef. It returns a feature with the reply message. An example for that will be sending CreateUserCommand to the user entity.

ref(user.id).ask(CreateUserCommand(user))

And this is how Lagom's persistence helps in managing data for your service.

Now, when you will run the application, you can see in Cassandra query language shell (CQLSH) that keyspace named userdatabase will be created with four tables messages, config, snapshots, and metadata. Events are actually persisted in the messages table.

The complete demo code is available here. You can check the README file for instructions to run the application.

References

Hope you enjoyed reading this blog!

This article was first published on the Knoldus blog.

Learn how to get 20x more performance than Elastic by moving to a Time Series database.

Topics:
lagom ,database ,data persistence ,tutorial ,cqrs ,event sourcing

Published at DZone with permission of Divya Dua, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}