Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Divided We Win: an Event Sourcing, CQRS Perspective on Write and Read Models Separation. Queries.

DZone's Guide to

Divided We Win: an Event Sourcing, CQRS Perspective on Write and Read Models Separation. Queries.

Quite a while ago, we started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. The goal of this blog post is to discuss the ways to handle queries following CQRS architecture.

· Database Zone
Free Resource

Traditional relational databases weren’t designed for today’s customers. Learn about the world’s first NoSQL Engagement Database purpose-built for the new era of customer experience.

Quite a while ago, we started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregatepersistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}

It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:

case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users] 
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global

  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))

    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }

    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption)
        .onComplete { replyTo ! _ }
    }

    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}

Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:

  • CreateSchema — to initialize database schema
  • UpdateUser — to update user's email address
  • FindUserByEmail — to query user by its email address
  • FindAllUsers — to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate—more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that; before actually performing the update we will run the query against read model (data store) to make sure no user with such an email is already registered.


val receiveCommand: Receive = {
  case UserEmailUpdate(email) =>
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }

      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage)
  }
}

It's pretty simple, but not really idiomatic—the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if the result is not ready) but it didn't, probably because of thread context switch. So, the Await.result was here to the rescue.

Now, every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.


case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()

      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId =>
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event =>
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}

Basically, the code is simple enough but lets us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using the currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case,UserEmailUpdated, we just directly transform it into the data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing in the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.


pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map {
        case Success(users) =>
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) =>
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}

We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let's ensure that our journal and data store are empty.

$ curl -X GET http://localhost:38080/api/v1/users
[]

It makes sense, as we haven't created any users yet. Let's do that by updating two users with different email addresses and querying the read model again.

$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d email=a@c.com
Email updated: a@c.com

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]

As expected, this time the result is different, and we can see two users are being returned. Now for the moment of truth... let's try to update the email of user with 124 with the one of user with 123.

$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@c.com
Email 'a@c.com' already registered

Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and you might consider CQRS as an interesting option for your next project.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Learn how the world’s first NoSQL Engagement Database delivers unparalleled performance at any scale for customer experience innovation that never ends.

Topics:
database ,cqrs ,event sourcing ,queries

Published at DZone with permission of Andriy Redko, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}