{{announcement.body}}
{{announcement.title}}

Building Microservices Through Event-Driven Architecture, Part 7: Implementing Event Sourcing on Repositories

DZone 's Guide to

Building Microservices Through Event-Driven Architecture, Part 7: Implementing Event Sourcing on Repositories

In part seven of this popular series, Leye discusses implementing event sourcing on repositories when building microservices.

· Microservices Zone ·
Free Resource
Learn more about implementing event sourcing!

In this article, I will talk about Event Sourcing implementation on Repository.

The repository is responsible for adding events to the eventstore and also retrieving all events from the event store.

When the aggregate is saved, then all uncommitted events related to that aggregateroot are added to the eventstore table.

The eventstore table is append-only (update and delete are not allowed).

You may also like: Building Microservices Through Event-Driven Architecture, Part 1: Application-Specific Business Rules

The schema of the eventstore table will look like this:

dbo.EventStore screenshot

  • The id is the primary key.
  • Version is the version of the aggregate.
  • AggregateId is the identifier of the aggregate.
  • Name is the name of the event: 2@735f8407-16be-44b5-be96-2bab582b5298.
  • TypeName is the type of the event: LogCorner.EduSync.Speech.Domain.Events.Speech.SpeechCreatedEvent, LogCorner.EduSync.Speech.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null
  • OccurredOn is the event date.
  • The PayLoad is the event stream.
Java




x
11


 
1
{
2
 
           
3
"Title": { "Value": "Introducing Azure Cosmos DB" }, 
4
"Url": { "Value": "https://azure.microsoft.com/en-us/resomurcejjjnns/videos/azure-friday-introducing-azurkke-cosmos-db_g5/" }, 
5
"Description": { "Value": "Kirill Gavrylyuk stops by Azure Friday to talk Cosmos DB with Scott Hanselman. Watch quick overview of the industry's first globally distributed multi-model database service followed by a demo of moving an existing MongoDB app to Cosmos DB with a single config change." }, 
6
"Type": { "Value": 2 }, 
7
"AggregateId": "735f8407-16be-44b5-be96-2bab582b5298", 
8
"EventId": "6eb58cb4-da5e-46d4-8325-e742a20935ab", 
9
"AggregateVersion": 0, 
10
"OcurrendOn": "2019-09-08T10:55:48.5528117Z" 
11
}



IsSync: A boolean that indicates whether the event is synchronized or not.
To rebuild the current state of the aggregate, we have to read all events related to a given aggregateId and then call a function LoadFromHistory. This function belongs to the AggregateRoot class and should apply all events to the aggregate.

Event-Sourcing Interfaces

Let us define IEventStoreRepository interfaces

code snippet

GetByIdAsync is a function that retrieves all events related to the aggregate from the event store. AppendAsync is a function that appends an event to the event store.

Event-Sourcing Implementation

Let us define the EventStore class.

code snippet

  • The id is the identifier of the event stream.
  • Version is the current aggregate version.
  • AggregateId is the identifier of the aggregate.
  • Name is the Name of the event stream.
  • TypeName is the full type of the event (ex : LogCorner.EduSync.Speech.Domain.Events.Speech.SpeechCreatedEvent, LogCorner.EduSync.Speech.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null)
  • OccurredOn is the date on which the event occurred.
  • SerializedBody is the event serialized in JSON.
  • AppendAsync is a function that indicates whether the event is synchronized or not yet.

AppendaSync Implementation

So the first test is AppendAsync should append an event on eventstore

Test Case 1: AppendaSync Should Append an Event on EventStore

Here I have to mock a Context, create an instance of IEventStoreRepository and call AppendAsync with an EventStore object, then the context should have a set of EventStore with a single element and that element should be equals to the eventstore object I passed in the argument of AppendAsync.

The test may look like this:

code snippet

Instead of using moq I will use entityframeworkcore in-memory database which is a simpler way of Database Unit Testing.

I can instantiate an in-memory context like this:

The next step is to create an EventStoreRepository class.

code snippet

Then add a DbSet<EventStore> EventStore property to DatabaseContex class.

code snippet

The code compiles but test fails because DbSet<EventStore> EventStore is null.

Let us config some mappings.

code snippet

EventStoreEntityTypeConfiguration class has reponsibility to map EventStore class to EventStore database table.

code snippet

The final test looks like this:

code snippet

GetByIdAsync Implementation

GetByIdAsync retrieves all events related to the aggregate from the event store.

Here I have to create an empty aggregate of type Speech, read all events from the eventstore related to it and finally apply the events.

code snippet

Test Case 2: CreateInstance Of AggregateRoot Should Return An Empty Aggregate

code snippet

Let us create an Invoker class with the responsibility to create an empty aggregateroot.

code snippet

The test pass.

Test Case 3: GetByIdAsync With BadAggregateId Should Raise BadAggregateIdException

Given an bad aggregateId (ex: empty aggregateId), GetByIdAsync should raise an exception (BadAggregateIdException).

code snippet

Test Case 4: GetByIdAsyncWithNullInstanceOfAggregateShouldRaiseNullInstanceOfAggregateIdException

Given an aggregateId that does not exist (ex: empty aggregateId), GetByIdAsync should raise an exception (NullInstanceOfAggregateIdException).

code snippet code snippet

Test Case 5: GetByIdAsyncWithoutEventsShouldReturnEmptyList

if there is no event related to a given aggregateId, then GetByIdAsyncshould returns an empty list.

code snippet code snippet

Test Case 6: GetByIdAsyncWithEventsShouldReturnTheCurrentStateOfAggregate

Here, I have to read all events related to the given aggregateId from the event store, rebuild the aggregate state and return it.

To reach this goal, I have to deserialize the event which is in JSON structure according to the event type.

Test Case 6.1: Deserialize Event Stream Should Return an Event

Here I will create a function that deserializes a JSON string to an Event object.

code snippet code snippet code snippet

Test Case 6.2: GetByIdAsyncWithEventsShouldReturnTheCurrentStateOfTheAggregate

Here, I will finish the implementation of my function.

code snippet code snippet infrastructure screenshot

The source code of this article is available on GitHub (Feature/Task/ EventSourcingRepository).

Regards!


Further Reading

Building Microservices Through Event-Driven Architecture, Part 2: Domain Objects and Business Rules

Building Microservices Through Event-Driven Architecture, Part 3: Presenters, Views, and Controllers

Building Microservices Through Event-Driven Architecture, Part 4: Repositories

Building Microservices Through Event-Driven Architecture, Part 5: Dockerization

Building Microservices Through Event-Driven Architecture, Part 6: Implementing EventSourcing on Domain Model

Topics:
event-driven, event-driven architecture, eventsourcing, microservices, repositories

Published at DZone with permission of Gora LEYE , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}