DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Coding
  3. Frameworks
  4. Reactive Programming With Project Reactor

Reactive Programming With Project Reactor

Want to learn more about reactive programming with Project Reactor? Check out this post to learn more with these example scenarios.

Piotr Mińkowski user avatar by
Piotr Mińkowski
CORE ·
Oct. 24, 18 · Tutorial
Like (11)
Save
Tweet
Share
39.91K Views

Join the DZone community and get the full member experience.

Join For Free















If you are building reactive microservices, you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some of the most common scenarios of using reactive streams in microservice-based architecture during inter-service communication. I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:

  • Reactive Microservices with Spring WebFlux and Spring Cloud
  • Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux

Spring Framework supports reactive programming since version 5. That support is build on top of Project Reactor. Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono and Flux. With Flux,we can emit 0..nelements. While with Mono, we can create a stream of 0..1 elements. Both those types implement the Publisher interface. Both of these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs, it is important not to block the stream. Spring WebFlux doesn’t allow that.

Introduction

The sample project is available on GitHub in repository reactive-playground. It is written in Kotlin. In addition to some Kotlin libraries, the only single dependency that needs to be added in order to use Project Reactor is reactor-core.

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.2.1.RELEASE</version>
</dependency>


I would not like to show you the features of Project Reactor based on simple String objects, like in many other articles. Therefore, I have created the following class hierarchy for our tests that allows us to simulate APIs built for three different domain objects.

reactor-4


Class Organization contains a list of Employee and Department. Each department contains a list of Employee assigned only to the given department inside organization. Class Employee has properties: organizationId that assigns it to the organization and departmentId that assigns it to the department.

data class Employee(var id: Int, var name: String, var salary: Int) {
    var organizationId: Int? = null
    var departmentId: Int? = null

    constructor(id: Int, name: String, salary: Int, organizationId: Int, departmentId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
        this.departmentId = departmentId
    }

    constructor(id: Int, name: String, salary: Int, organizationId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
    }
}


Here’s the implementation of the Department class.

class Department(var id: Int, var name: String, var organizationId: Int) {
    var employees: MutableList<Employee> = ArrayList()

    constructor(id: Int, name: String, organizationId: Int, employees: MutableList<Employee>) : this(id, name, organizationId) {
        this.employees.addAll(employees)
    }

    fun addEmployees(employees: MutableList<Employee>) : Department {
        this.employees.addAll(employees)
        return this
    }

    fun addEmployee(employee: Employee) : Department {
        this.employees.add(employee)
        return this
    }

}


Here’s the implementation of the Organization class.

class Organization(var id: Int, var name: String) {
    var employees: MutableList<Employee> = ArrayList()
    var departments: MutableList<Department> = ArrayList()

    constructor(id: Int, name: String, employees: MutableList<Employee>, departments: MutableList<Department>) : this(id, name){
        this.employees.addAll(employees)
        this.departments.addAll(departments)
    }

    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name){
        this.employees.addAll(employees)
    }
}


Scenario 1

We have two API methods that return data streams.The first of them return Flux, emitting employees assigned to the given organization. The second returns Mono with the current organization.

private fun getOrganizationByName(name: String) : Mono<Organization> {
   return Mono.just(Organization(1, name))
}

private fun getEmployeesByOrganization(id: Int) : Flux<Employee> {
   return Flux.just(Employee(1, "Employee1", 1000, id),
      Employee(2, "Employee2", 2000, id))
}


We would like to return the single stream emitting organization that contains a list of employees, as shown below.

reactor-scenario-1

Here’s the solution. We use the zipWhen method that waits for result from source Mono and then calls the second Mono. Because we can zip only the same stream types (in that case these are Mono), we need to convert Flux<Employee> returned by the getEmployeesByOrganization method into Mono<MutableList<Employee>> using collectList function. Thanks to zipWhen, we can then combine two Mono streams and create a new object inside the map function.

@Test
fun testScenario1() {
   val organization : Mono<Organization> = getOrganizationByName("test")
      .zipWhen { organization ->
         getEmployeesByOrganization(organization.id!!).collectList()
   }
   .map { tuple -> 
      Organization(tuple.t1.id, tuple.t1.name, tuple.t2)
   }
}


Scenario 2

Let’s consider another scenario. Now, we have two Flux streams that emit employees and departments. Every employee has a property departmentId responsible for assignment to the department.

private fun getDepartments() : Flux<Department> {
    return Flux.just(Department(1, "X", 1),
                     Department(2, "Y", 1))
}

private fun getEmployees() : Flux<Employee> {
    return Flux.just(Employee(1, "Employee1", 1000, 1, 1),
            Employee(2, "Employee2", 2000, 1, 1),
            Employee(3, "Employee3", 1000, 1, 2),
            Employee(4, "Employee4", 2000, 1, 2))
}


The goal is to merge those two streams and return the single Flux stream, emitting departments that contain all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.

reactor-5

We can do that in two ways. First, we call the flatMap function on stream with departments. Inside flatMap, we zip every single Department with stream of employees. That stream is then filtered by the departmentId and converted into the Mono type. Finally, we create the Mono type using the map function that emits the department containing the list of employees.

The second way groups Flux with employees by departmentId. Then, it invokes zipping and mapping functions similar to the previous approach.

@Test
fun testScenario2() {
   val departments: Flux<Department> = getDepartments()
      .flatMap { department ->
         Mono.just(department)
            .zipWith(getEmployees().filter { it.departmentId == department.id }.collectList())
            .map { t -> t.t1.addEmployees(t.t2) }
   }

   val departments2: Flux<Department> = getEmployees()
      .groupBy { it.departmentId }
      .flatMap { t -> getDepartments().filter { it.id == t.key() }.elementAt(0)
      .zipWith(t.collectList())
      .map { it.t1.addEmployees(it.t2) }
   }
}


Scenario 3

This scenario is simpler than the two previous scenarios. We have two API methods that emit Flux with the same object types. The first of them contains a list of employees — id, name, salary properties— while the second includes the id, organizationId, and departmentId properties.

private fun getEmployeesBasic() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000),
                  Employee(2, "BB", 2000))
}

private fun getEmployeesRelationships() : Flux<Employee> {
   return Flux.just(Employee(1, 1, 1),
     Employee(2, 1, 2))
}


We want to convert it into a single stream emitting employees with full set of properties. The following picture illustrates the described transformation.

reactor-scenario-3

In that case, the solution is pretty simple. We are zipping two Flux streams using the zipWith function, and then, we map the two zipped objects into a single, containing the full set of properties.

@Test
fun testScenario3() {
   val employees : Flux<Employee> = getEmployeesBasic()
      .zipWith(getEmployeesRelationships())
      .map { t -> Employee(t.t1.id, t.t1.name, t.t1.salary, t.t2.organizationId!!, t.t2.departmentId!!) }
}


Scenario 4

In this scenario, we have two independent Flux streams that emit the same type of objects – Employee.

private fun getEmployeesFirstPart() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000), Employee(3, "BB", 3000))
}

private fun getEmployeesSecondPart() : Flux<Employee> {
   return Flux.just(Employee(2, "CC", 2000), Employee(4, "DD", 4000))
}


We would like to merge those two streams into a single stream ordered by id. The following picture shows that transformation.

reactor-scenario-4

Here’s the solution. We use the mergeOrderedWith function with comparator that compares id. Then, we can perform some transformations on every object, but it is only an option that shows the usage on map function.

@Test
fun testScenario4() {
   val persons: Flux<Employee> = getEmployeesFirstPart()
      .mergeOrderedWith(getEmployeesSecondPart(), Comparator { o1, o2 -> o1.id.compareTo(o2.id) })
      .map {
         Employee(it.id, it.name, it.salary, 1, 1)
      }
}


Scenario 5

And, the last scenario in this article – we have a single input stream Mono<Organization> that contains a list of departments. Each department inside that list contains the list of all employees assigned to the given department. Here’s our API method implementation.

private fun getDepartmentsByOrganization(id: Int) : Flux<Department> {
   val dep1 = Department(1, "A", id, mutableListOf(
      Employee(1, "Employee1", 1000, id, 1),
      Employee(2, "Employee2", 2000, id, 1)
   ))
   val dep2 = Department(2, "B", id, mutableListOf(
      Employee(3, "Employee3", 1000, id, 2),
      Employee(4, "Employee4", 2000, id, 2)
   ))
   return Flux.just(dep1, dep2)
}


The goal is to convert the stream to the same stream Flux<Department>, but containing the list of all employees in department. The following picture visualizes the described transformation.

reactor-scenario-5

Here’s the solution. We invoke the flatMapIterable function that converts Flux<Department> into Flux<Employees> by returning List<Employee>. Then, we convert it to Mono and add to newly created Organization object inside the map function.

@Test
fun testScenario5() {
   var organization: Mono<Organization> = getDepartmentsByOrganization(1)
      .flatMapIterable { department -> department.employees }
      .collectList()
      .map { t -> Organization(1, "X", t) }
}


Happy coding!

Reactive programming Stream (computing) Spring Framework

Published at DZone with permission of Piotr Mińkowski, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Multi-Tenant Architecture for a SaaS Application on AWS
  • Integrate AWS Secrets Manager in Spring Boot Application
  • Create CloudWatch Custom Log Metric Alarm Notification Email Solution Using Terraform
  • 10 Most Popular Frameworks for Building RESTful APIs

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: