DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Evolutionary Architecture: A Solution to the Lost Art of Software Design
  • How To Get Closer to Consistency in Microservice Architecture
  • Configuration As A Service: Spring Cloud Config – Using Kotlin
  • A New Era Of Spring Cloud

Trending

  • Simplify Authorization in Ruby on Rails With the Power of Pundit Gem
  • Chaos Engineering for Microservices
  • Scaling InfluxDB for High-Volume Reporting With Continuous Queries (CQs)
  • SQL Server Index Optimization Strategies: Best Practices with Ola Hallengren’s Scripts
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Kotlin Clean Architecture and CQRS

Kotlin Clean Architecture and CQRS

Explore the implementation of Clean Architecture and CQRS microservice using Kotlin, Spring WebFlux with coroutines, PostgreSQL, MongoDB, and Kafka.

By 
Alexander Bryksin user avatar
Alexander Bryksin
·
Mar. 27, 24 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
2.1K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will implement a microservice using Clean Architecture and CQRS. The tech stack uses Kotlin, Spring WebFlux with coroutines, PostgreSQL and MongoDB, Kafka as a message broker, and the Arrow-kt functional library, which, as the documentation says, brings idiomatic functional programming to Kotlin. 

Clean Architecture

Clean architecture diagram

Clean Architecture is one of the more popular software design approaches. It follows the principles of Dependency Inversion, Single Responsibility, and Separation of Concerns. It consists of concentric circles representing different layers, with the innermost layer being the most abstract and the outermost layer representing the user interface and infrastructure. 

By separating the concerns of the various components and enforcing the dependency rule, it becomes much easier to understand and modify the code. Depending on abstractions allows you to design your business logic flexibly without having to know the implementation details. 

The Domain Layer and the Application Layer are the core of the Clean Architecture. These two layers together form the application core, encapsulating the most important business rules of the system. 

Clean Architecture is a domain-centric architectural approach that separates business logic from technical implementation details.

CQRS

CQRS

CQRS stands for Command and Query Responsibility Segregation, a pattern that separates reads and writes into different models, using commands to update data, and queries to read data. Using CQRS, you should have a strict separation between the write model and the read model. Those two models should be processed by separate objects and not be conceptually linked together. Those objects are not physical storage structures but are, for example, command handlers and query handlers. They’re not related to where and how the data will be stored: they’re connected to the processing behavior. 

Command handlers are responsible for handling commands, mutating state, or doing other side effects. Query handlers are responsible for returning the result of the requested query. They give us: 

  • Scalability, which allows for independent scaling of read and write operations 
  • Performance: By separating read and write operations, you can optimize each for performance. Reads can be optimized for fast retrieval by using denormalized data structures, caching, and specialized read models tailored to specific query needs. 
  • Flexibility allows us to model the read and write sides of the application differently, which provides flexibility in designing the data structures and processing logic to best suit the requirements of each operation. This flexibility can lead to a more efficient and maintainable system, especially in complex domains where the read and write requirements differ significantly. 

One of the common misconceptions about CQRS is that the commands and queries should be run on separate databases. This isn’t necessarily true, only that the behaviors and responsibilities for both should be separated. This can be within the code, within the structure of the database, or different databases.

Nothing in an inner circle can know anything about something in an outer circle. In particular, the name of something declared in an outer circle must not be mentioned by the code in the inner circle. That includes functions and classes, variables, or any other named software entity. 

In the real world, understanding Clean Architecture can vary from person to person. Since Clean Architecture emphasizes principles such as separation of concerns, dependency inversion, and abstraction layers, different developers may interpret and implement these principles differently based on their own experiences, knowledge, and project requirements. 

This article shows my personal view of one of the possible ways of implementation. Ultimately, the goal of Clean Architecture is to create software systems that are maintainable, scalable, and easy to understand.

Layers

Layers

Presentation Layer

The Presentation Layer (named api here) is the most outside layer and the entry point to our system. The most important part of the presentation layer is the controllers, which define the API endpoints in our system presented to the outside world and are responsible for:

  • Handling interaction with the outside world
  • Presenting, displaying, or returning responses with the data
  • Translating the outside requests data (map requests to application layer commands)
  • Works with framework-specific configuration setup
  • Works on top of the application layer

API Layer tree

Let's look at the full process of command requests in the microservice. First things first: it accepts REST HTTP requests; validates input; if it's secured, checks credentials, etc.; then maps the request to the DTO the command and calls the AccountCommandService handle method. 

For example, let's look at creating new account and deposit balance commands methods call flow:

Create new account and deposit balance commands methods


Execute Responses


Kotlin
 
@Tag(name = "Accounts", description = "Account domain REST endpoints")
@RestController
@RequestMapping(path = ["/api/v1/accounts"])
class AccountController(
    private val accountCommandService: AccountCommandService,
    private val accountQueryService: AccountQueryService
) {

    @Operation(
        method = "createAccount", operationId = "createAccount", description = "Create new Account",
        responses = [
            ApiResponse(
                description = "Create new Account",
                responseCode = "201",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = AccountId::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @PostMapping
    suspend fun createAccount(
        @Valid @RequestBody request: CreateAccountRequest
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountCommandService.handle(request.toCommand()).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { ResponseEntity.status(HttpStatus.CREATED).body(it) }
    )

    @Operation(
        method = "depositBalance", operationId = "depositBalance", description = "Deposit balance",
        responses = [
            ApiResponse(
                description = "Deposit balance",
                responseCode = "200",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = BaseResponse::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @PutMapping(path = ["/{id}/deposit"])
    suspend fun depositBalance(
        @PathVariable id: UUID,
        @Valid @RequestBody request: DepositBalanceRequest
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountCommandService.handle(request.toCommand(AccountId(id))).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { okResponse(it) }
    )
}


Application and Domain Layers

The Application Layer contains the use cases of the application. A use case represents a specific interaction or action that the system can perform. Each use case is implemented as a command or a query. It is part of the whole application core like a Domain Layer and is responsible for:

  • Executing the application use cases (all the actions and commands allowed to be done with the system)
  • Fetch domain objects
  • Manipulating domain objects

Application Layer Tree

The Application Layer AccountCommandService has the business logic, which runs required business rules validations, then applies changes to the domain aggregate, persists domain objects in the database, produces the domain events, and persists them in the outbox table within one single transaction. The current application used some not-required small optimization for outbox publishing. After the command service commits the transaction, we publish the event, but we don't care if this publish fails, because the polling publisher realizes that the Spring scheduler will process it anyway.

Arrow greatly improves developer experience because Kotlin doesn’t ship the Either type with the standard SDK. Either is an entity whose value can be of two different types, called left and right. By convention, the right is for the success case and the left is for the error one. It allows us to express the fact that a call might return a correct value or an error, and differentiate between the two of them. The left/right naming pattern is just a convention. Either is a great way to make the error handling in your code more explicit. Making the code more explicit reduces the amount of context that you need to keep in your head, which in turn makes the code easier to understand.

Kotlin
 
interface AccountCommandService {

    suspend fun handle(command: CreateAccountCommand): Either<AppError, AccountId>

    suspend fun handle(command: ChangeAccountStatusCommand): Either<AppError, Unit>

    suspend fun handle(command: ChangeContactInfoCommand): Either<AppError, Unit>

    suspend fun handle(command: DepositBalanceCommand): Either<AppError, Unit>

    suspend fun handle(command: WithdrawBalanceCommand): Either<AppError, Unit>

    suspend fun handle(command: UpdatePersonalInfoCommand): Either<AppError, Unit>
}

@Service
class AccountCommandServiceImpl(
    private val accountRepository: AccountRepository,
    private val outboxRepository: OutboxRepository,
    private val tx: TransactionalOperator,
    private val eventPublisher: EventPublisher,
    private val serializer: Serializer,
    private val emailVerifierClient: EmailVerifierClient,
    private val paymentClient: PaymentClient
) : AccountCommandService {

    override suspend fun handle(command: CreateAccountCommand): Either<AppError, AccountId> = eitherScope(ctx) {
        emailVerifierClient.verifyEmail(command.contactInfo.email).bind()

        val (account, event) = tx.executeAndAwait {
            val account = accountRepository.save(command.toAccount()).bind()
            val event = outboxRepository.insert(account.toAccountCreatedOutboxEvent(serializer)).bind()
            account to event
        }

        publisherScope.launch { publishOutboxEvent(event) }
        account.accountId
    }

    override suspend fun handle(command: DepositBalanceCommand): Either<AppError, Unit> = eitherScope(ctx) {
        paymentClient.verifyPaymentTransaction(command.accountId.string(), command.transactionId).bind()

        val event = tx.executeAndAwait {
            val foundAccount = accountRepository.getById(command.accountId).bind()
            foundAccount.depositBalance(command.balance).bind()

            val account = accountRepository.update(foundAccount).bind()
            val event = account.toBalanceDepositedOutboxEvent(command.balance, serializer)
            outboxRepository.insert(event).bind()
        }

        publisherScope.launch { publishOutboxEvent(event) }
    }
}


The Domain Layer encapsulates the most important business rules of the system. It is the place where we have to start building core business rules. In the domain-centric architecture, we start developing from the domain. The responsibilities of the Domain Layer are as follows:

  • Defining domain models
  • Defining rules, domain, and business errors
  • Executing the application business logic
  • Enforcing the business rules

Domain Layer Tree

Domain models have data and behavior and represent the domain. We have two approaches for designing: rich and anemic domain models. 

  • Anemic models allow external manipulation of our data, and it's usually antipattern because the domain object itself doesn't control its own data. 
  • Rich domain models contain both data and behavior. The richer the behavior, the richer the domain model. It exposes only a specific set of public methods, which allows manipulation of data only in the way the domain approves, encapsulates logic, and does validations. Rich domain model properties are read-only by default.

Domain models can be always valid or not; it's better to prefer always-valid domain models. At any point in time when we're working with domain state, we know it's valid and don't need to write additional validations to check it. Always-valid domain models mean they are in a valid state all the time. 

One more important detail is Persistence Ignorance - modeling the domain without taking into account how domain objects will be persisted.

Kotlin
 
class Account(
    val accountId: AccountId = AccountId(),
) {
    var contactInfo: ContactInfo = ContactInfo()
        private set
    var personalInfo: PersonalInfo = PersonalInfo()
        private set
    var address: Address = Address()
        private set
    var balance: Balance = Balance()
        private set
    var status: AccountStatus = AccountStatus.FREE
        private set

    var version: Long = 0
        private set
    var updatedAt: Instant? = null
        private set
    var createdAt: Instant? = null
        private set

    fun depositBalance(newBalance: Balance): Either<AppError, Account> = either {
        if (balance.balanceCurrency != newBalance.balanceCurrency) raise(InvalidBalanceCurrency("invalid currency: $newBalance"))
        if (newBalance.amount < 0) raise(InvalidBalanceAmount("invalid balance amount: $newBalance"))

        balance = balance.copy(amount = (balance.amount + newBalance.amount))
        updatedAt = Instant.now()

        this@Account
    }

    fun withdrawBalance(newBalance: Balance): Either<AppError, Account> = either {
        if (balance.balanceCurrency != newBalance.balanceCurrency) raise(InvalidBalanceCurrency("invalid currency: $newBalance"))
        if (newBalance.amount < 0) raise(InvalidBalanceAmount("invalid balance amount: $newBalance"))

        val newAmount = (balance.amount - newBalance.amount)
        if ((newAmount) < 0) raise(InvalidBalanceError("invalid balance: $newBalance"))

        balance = balance.copy(amount = newAmount)
        updatedAt = Instant.now()

        this@Account
    }

    fun updateStatus(newStatus: AccountStatus): Either<AppError, Account> = either {
        status = newStatus
        updatedAt = Instant.now()
        this@Account
    }

    fun changeContactInfo(newContactInfo: ContactInfo): Either<AppError, Account> = either {
        contactInfo = newContactInfo
        updatedAt = Instant.now()
        this@Account
    }

    fun changeAddress(newAddress: Address): Either<AppError, Account> = either {
        address = newAddress
        updatedAt = Instant.now()
        this@Account
    }

    fun changePersonalInfo(newPersonalInfo: PersonalInfo): Either<AppError, Account> = either {
        personalInfo = newPersonalInfo
        updatedAt = Instant.now()
        this@Account
    }

    fun incVersion(amount: Long = 1): Either<AppError, Account> = either {
        if (amount < 1) raise(InvalidVersion("invalid version: $amount"))
        version += amount
        updatedAt = Instant.now()
        this@Account
    }

    fun withVersion(amount: Long = 1): Account {
        version = amount
        updatedAt = Instant.now()
        return this
    }

    fun decVersion(amount: Long = 1): Either<AppError, Account> = either {
        if (amount < 1) raise(InvalidVersion("invalid version: $amount"))
        version -= amount
        updatedAt = Instant.now()
        this@Account
    }

    fun withUpdatedAt(newValue: Instant): Account {
        updatedAt = newValue
        return this
    }
}


Infrastructure Layer

Next is the Infrastructure Layer, which contains implementations for external-facing services and is responsible for:

  • Interacting with the persistence solution
  • Interacting with other services (HTTP or gRPC clients, message brokers, etc.)
  • Actual implementations of the interfaces from the application layer
  • Identity concerns

Infrastructure Layer tree


Accounts and outbox_table

At the Infrastructure Layer, we have implementations of the Application Layer interfaces. The main write database used PostgreSQL with r2dbc reactive driver, and DatabaseClient with raw SQL queries. If we want to use an ORM entity, we still pass domain objects through the other layer interfaces anyway, and then inside the repository implementation, code map to the ORM entities. For this project, keep Spring annotations as is; but if we want cleaner implementation, it's possible to move them to another layer. In this example, the project SQL schema is simplified and not normalized.

Kotlin
 
interface AccountRepository {

    suspend fun getById(id: AccountId): Either<AppError, Account>

    suspend fun save(account: Account): Either<AppError, Account>

    suspend fun update(account: Account): Either<AppError, Account>
}

@Repository
class AccountRepositoryImpl(
    private val dbClient: DatabaseClient
) : AccountRepository {

    override suspend fun save(account: Account): Either<AppError, Account> = eitherScope<AppError, Account>(ctx) {
        dbClient.sql(INSERT_ACCOUNT_QUERY.trimMargin())
            .bindValues(account.withVersion(FIRST_VERSION).toPostgresEntityMap())
            .fetch()
            .rowsUpdated()
            .awaitSingle()

        account
    }

    override suspend fun update(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        dbClient.sql(OPTIMISTIC_UPDATE_QUERY.trimMargin())
            .bindValues(account.withUpdatedAt(Instant.now()).toPostgresEntityMap(withOptimisticLock = true))
            .fetch()
            .rowsUpdated()
            .awaitSingle()

        account.incVersion().bind()
    }

    override suspend fun getById(id: AccountId): Either<AppError, Account> = eitherScope(ctx) {
        dbClient.sql(GET_ACCOUNT_BY_ID_QUERY.trimMargin())
            .bind(ID_FIELD, id.id)
            .map { row, _ -> row.toAccount() }
            .awaitSingleOrNull()
            ?: raise(AccountNotFoundError("account for id: $id not found"))
    }
}


Below is an important detail about outbox repository realization:

Outbox repository realization

To be able to handle the case of multiple pod instances processing in parallel outbox table, of course, we have idempotent consumers. However, as we can, we have to avoid processing the same table events more than one time. To prevent multiple instances from selecting and publishing the same events, we use FOR UPDATE SKIP LOCKED. This combination does the next thing: When one instance tries to select a batch of outbox events, if some other instance already selected these records, first, one will skip locked records and select the next available and not locked, and so on. But again, it's only my personal preferred way of implementation. The use of only polling publishers is usually the default one. As a possible alternative, use Debezium (for example), but it's up to you.

Kotlin
 
interface OutboxRepository {

    suspend fun insert(event: OutboxEvent): Either<AppError, OutboxEvent>

    suspend fun deleteWithLock(
        event: OutboxEvent,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, OutboxEvent>

    suspend fun deleteEventsWithLock(
        batchSize: Int,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, Unit>
}

@Component
class OutboxRepositoryImpl(
    private val dbClient: DatabaseClient,
    private val tx: TransactionalOperator
) : OutboxRepository {

    override suspend fun insert(event: OutboxEvent): Either<AppError, OutboxEvent> = eitherScope(ctx) {
        dbClient.sql(INSERT_OUTBOX_EVENT_QUERY.trimMargin())
            .bindValues(event.toPostgresValuesMap())
            .map { row, _ -> row.get(ROW_EVENT_ID, String::class.java) }
            .one()
            .awaitSingle()
            .let { event }
    }


    override suspend fun deleteWithLock(
        event: OutboxEvent,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, OutboxEvent> = eitherScope {
        tx.executeAndAwait {
            dbClient.sql(GET_OUTBOX_EVENT_BY_ID_FOR_UPDATE_SKIP_LOCKED_QUERY.trimMargin())
                .bindValues(mutableMapOf(EVENT_ID to event.eventId))
                .map { row, _ -> row.get(ROW_EVENT_ID, String::class.java) }
                .one()
                .awaitSingleOrNull()

            callback(event).bind()
            deleteOutboxEvent(event).bind()
            event
        }
    }


    override suspend fun deleteEventsWithLock(
        batchSize: Int,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, Unit> = eitherScope(ctx) {
        tx.executeAndAwait {
            dbClient.sql(GET_OUTBOX_EVENTS_FOR_UPDATE_SKIP_LOCKED_QUERY.trimMargin())
                .bind(LIMIT, batchSize)
                .map { row, _ -> row.toOutboxEvent() }
                .all()
                .asFlow()
                .onStart { log.info { "start publishing outbox events batch: $batchSize" } }
                .onEach { callback(it).bind() }
                .onEach { event -> deleteOutboxEvent(event).bind() }
                .onCompletion { log.info { "completed publishing outbox events batch: $batchSize" } }
                .collect()
        }
    }

    private suspend fun deleteOutboxEvent(event: OutboxEvent): Either<AppError, Long> = eitherScope(ctx) {
        dbClient.sql(DELETE_OUTBOX_EVENT_BY_ID_QUERY)
            .bindValues(mutableMapOf(EVENT_ID to event.eventId))
            .fetch()
            .rowsUpdated()
            .awaitSingle()
    }
}


The polling publisher implementation is a scheduled process that does the same job for publishing and deleting events at the given interval, as typed earlier, and uses the same service method:

Kotlin
 
@Component
@ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true")
class OutboxScheduler(
    private val outboxRepository: OutboxRepository,
    private val publisher: EventPublisher,
) {

    @Value("\${schedulers.outbox.batchSize}")
    private var batchSize: Int = 30

    @Scheduled(
        initialDelayString = "\${schedulers.outbox.initialDelayMillis}",
        fixedRateString = "\${schedulers.outbox.fixedRate}"
    )
    fun publishOutboxEvents() = runBlocking {
        eitherScope {
            outboxRepository.deleteEventsWithLock(batchSize) { publisher.publish(it) }.bind()
        }.fold(
            ifLeft = { err -> log.error { "error while publishing scheduler outbox events: $err" } },
            ifRight = { log.info { "outbox scheduler published events" } }
        )
    }
}


Produced messages

A domain event is something interesting from a business point of view that happened within the system; something that already occurred. We're capturing the fact something happened with the system. After events have been published from the outbox table to the broker, in this application, it consumes them from Kafka, and the consumers themselves call EventHandlerService methods, which builds a read model for our domain aggregates.

The read model of a CQRS-based system provides materialized views of the data, typically as highly denormalized views. These views are tailored to the interfaces and display requirements of the application, which helps to maximize both display and query performance. For error handling and retry, messages prefer to use separate retry topics and listeners. Using the stream of events as the write store rather than the actual data at a point in time avoids update conflicts on a single aggregate and maximizes performance and scalability. The events can be used to asynchronously generate materialized views of the data that are used to populate the read store. As with any system where the write and read stores are separate, systems based on this pattern are only eventually consistent. There will be some delay between the event being generated and the data store being updated. 

Here is Kafka consumer implementation:

Kotlin
 
@Component
class BalanceDepositedEventConsumer(
    private val eventProcessor: EventProcessor,
    private val kafkaTopics: KafkaTopics
) {


    @KafkaListener(
        groupId = "\${kafka.consumer-group-id:account_microservice_group_id}",
        topics = ["\${topics.accountBalanceDeposited.name}"],
    )
    fun process(ack: Acknowledgment, record: ConsumerRecord<String, ByteArray>) = eventProcessor.process(
        ack = ack,
        consumerRecord = record,
        deserializationClazz = BalanceDepositedEvent::class.java,
        onError = eventProcessor.errorRetryHandler(kafkaTopics.accountBalanceDepositedRetry.name, DEFAULT_RETRY_COUNT)
    ) { event ->
        eventProcessor.on(
            ack = ack,
            consumerRecord = record,
            event = event,
            retryTopic = kafkaTopics.accountBalanceDepositedRetry.name
        )
    }

    @KafkaListener(
        groupId = "\${kafka.consumer-group-id:account_microservice_group_id}",
        topics = ["\${topics.accountBalanceDepositedRetry.name}"],
    )
    fun processRetry(ack: Acknowledgment, record: ConsumerRecord<String, ByteArray>) = eventProcessor.process(
        ack = ack,
        consumerRecord = record,
        deserializationClazz = BalanceDepositedEvent::class.java,
        onError = eventProcessor.errorRetryHandler(kafkaTopics.accountBalanceDepositedRetry.name, DEFAULT_RETRY_COUNT)
    ) { event ->
        eventProcessor.on(
            ack = ack,
            consumerRecord = record,
            event = event,
            retryTopic = kafkaTopics.accountBalanceDepositedRetry.name
        )
    }
}


At the Application Layer, AccountEventsHandlerService is implemented in the following way:

Kotlin
 
interface AccountEventHandlerService {

    suspend fun on(event: AccountCreatedEvent): Either<AppError, Unit>

    suspend fun on(event: BalanceDepositedEvent): Either<AppError, Unit>

    suspend fun on(event: BalanceWithdrawEvent): Either<AppError, Unit>

    suspend fun on(event: PersonalInfoUpdatedEvent): Either<AppError, Unit>

    suspend fun on(event: ContactInfoChangedEvent): Either<AppError, Unit>

    suspend fun on(event: AccountStatusChangedEvent): Either<AppError, Unit>
}

@Component
class AccountEventHandlerServiceImpl(
    private val accountProjectionRepository: AccountProjectionRepository
) : AccountEventHandlerService {

    override suspend fun on(event: AccountCreatedEvent): Either<AppError, Unit> = eitherScope(ctx) {
        accountProjectionRepository.save(event.toAccount()).bind()
    }

    override suspend fun on(event: BalanceDepositedEvent): Either<AppError, Unit> = eitherScope(ctx) {
        findAndUpdateAccountById(event.accountId, event.version) { account ->
            account.depositBalance(event.balance).bind()
        }.bind()
    }

    private suspend fun findAndUpdateAccountById(
        accountId: AccountId,
        eventVersion: Long,
        block: suspend (Account) -> Account
    ): Either<AppError, Account> = eitherScope(ctx) {
        val foundAccount = findAndValidateVersion(accountId, eventVersion).bind()
        val accountForUpdate = block(foundAccount)
        accountProjectionRepository.update(accountForUpdate).bind()
    }

    private suspend fun findAndValidateVersion(
        accountId: AccountId,
        eventVersion: Long
    ): Either<AppError, Account> = eitherScope(ctx) {
        val foundAccount = accountProjectionRepository.getById(accountId).bind()
        validateVersion(foundAccount, eventVersion).bind()
        foundAccount
    }
}


The infrastructure layer read model repository uses MongoDB's Kotlin coroutines driver:

Infrastructure layer read model repository

Kotlin
 
interface AccountProjectionRepository {

    suspend fun save(account: Account): Either<AppError, Account>

    suspend fun update(account: Account): Either<AppError, Account>

    suspend fun getById(id: AccountId): Either<AppError, Account>

    suspend fun getByEmail(email: String): Either<AppError, Account>

    suspend fun getAll(page: Int, size: Int): Either<AppError, AccountsList>

    suspend fun upsert(account: Account): Either<AppError, Account>
}
Kotlin
 
@Component
class AccountProjectionRepositoryImpl(
    mongoClient: MongoClient,
) : AccountProjectionRepository {

    private val accountsDB = mongoClient.getDatabase(ACCOUNTS_DB)
    private val accountsCollection = accountsDB.getCollection<AccountDocument>(ACCOUNTS_COLLECTION)

    override suspend fun save(account: Account): Either<AppError, Account> = eitherScope<AppError, Account>(ctx) {
        val insertResult = accountsCollection.insertOne(account.toDocument())
        log.info { "account insertOneResult: $insertResult, account: $account" }
        account
    }

    override suspend fun update(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(ACCOUNT_ID, account.accountId.string()), eq(VERSION, account.version))
        val options = FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER)

        accountsCollection.findOneAndUpdate(
            filter,
            account.incVersion().bind().toBsonUpdate(),
            options
        )
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: ${account.accountId} not found"))
    }

    override suspend fun upsert(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(ACCOUNT_ID, account.accountId.string()))
        val options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)

        accountsCollection.findOneAndUpdate(
            filter,
            account.toBsonUpdate(),
            options
        )
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: ${account.accountId} not found"))
    }

    override suspend fun getById(id: AccountId): Either<AppError, Account> = eitherScope(ctx) {
        accountsCollection.find<AccountDocument>(eq(ACCOUNT_ID, id.string()))
            .firstOrNull()
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: $id not found"))
    }

    override suspend fun getByEmail(email: String): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(CONTACT_INFO_EMAIL, email))
        accountsCollection.find(filter).firstOrNull()?.toAccount()
            ?: raise(AccountNotFoundError("account with email: $email not found"))
    }

    override suspend fun getAll(
        page: Int,
        size: Int
    ): Either<AppError, AccountsList> = eitherScope<AppError, AccountsList>(ctx) {
        parZip(coroutineContext, {
            accountsCollection.find()
                .skip(page * size)
                .limit(size)
                .map { it.toAccount() }
                .toList()
        }, {
            accountsCollection.find().count()
        }) { list, totalCount ->
            AccountsList(
                page = page,
                size = size,
                totalCount = totalCount,
                accountsList = list
            )
        }
    }
}


Account parameters

Read queries' way through the layers is very similar: we accept HTTP requests at the API layer:

Kotlin
 
@Tag(name = "Accounts", description = "Account domain REST endpoints")
@RestController
@RequestMapping(path = ["/api/v1/accounts"])
class AccountController(
    private val accountCommandService: AccountCommandService,
    private val accountQueryService: AccountQueryService
) {
    @Operation(
        method = "getAccountByEmail", operationId = "getAccountByEmail", description = "Get account by email",
        responses = [
            ApiResponse(
                description = "Get account by email",
                responseCode = "200",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = AccountResponse::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @GetMapping(path = ["/email/{email}"])
    suspend fun getAccountByEmail(
        @PathVariable @Email @Size(
            min = 6,
            max = 255
        ) email: String
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountQueryService.handle(GetAccountByEmailQuery(email)).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { ResponseEntity.ok(it.toResponse()) }
    )
}


Application Layer AccountQueryService methods:

Kotlin
 
interface AccountQueryService {

    suspend fun handle(query: GetAccountByIdQuery): Either<AppError, Account>

    suspend fun handle(query: GetAccountByEmailQuery): Either<AppError, Account>

    suspend fun handle(query: GetAllAccountsQuery): Either<AppError, AccountsList>
}
Kotlin
 
@Service
class AccountQueryServiceImpl(
    private val accountRepository: AccountRepository,
    private val accountProjectionRepository: AccountProjectionRepository
) : AccountQueryService {

    override suspend fun handle(query: GetAccountByIdQuery): Either<AppError, Account> = eitherScope(ctx) {
        accountRepository.getById(query.id).bind()
    }

    override suspend fun handle(query: GetAccountByEmailQuery): Either<AppError, Account> = eitherScope(ctx) {
        accountProjectionRepository.getByEmail(query.email).bind()
    }

    override suspend fun handle(query: GetAllAccountsQuery): Either<AppError, AccountsList> = eitherScope(ctx) {
        accountProjectionRepository.getAll(page = query.page, size = query.size).bind()
    }
}


And it uses PostgreSQL or MongoDB repositories to get the data depending on the query use case:

Kotlin
 
@Component
class AccountProjectionRepositoryImpl(
    mongoClient: MongoClient,
) : AccountProjectionRepository {

    private val accountsDB = mongoClient.getDatabase(ACCOUNTS_DB)
    private val accountsCollection = accountsDB.getCollection<AccountDocument>(ACCOUNTS_COLLECTION)

    override suspend fun getByEmail(email: String): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(CONTACT_INFO_EMAIL, email))
        accountsCollection.find(filter)
            .firstOrNull()
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with email: $email not found"))
    }
}


Final Thoughts

In real-world applications, we have to implement many more necessary features, like K8s health checks, circuit breakers, rate limiters, etc., so this project is simplified for demonstration purposes. The source code is in my GitHub, please star it if is helpful and useful for you. For feedback or questions, feel free to contact me!

Architecture Kotlin (programming language) Software design Spring Framework microservice

Published at DZone with permission of Alexander Bryksin. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Evolutionary Architecture: A Solution to the Lost Art of Software Design
  • How To Get Closer to Consistency in Microservice Architecture
  • Configuration As A Service: Spring Cloud Config – Using Kotlin
  • A New Era Of Spring Cloud

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!