{{announcement.body}}
{{announcement.title}}

How to Create a Spring Cloud Stream Binder From Scratch

DZone 's Guide to

How to Create a Spring Cloud Stream Binder From Scratch

Learn how to develop a custom Spring Cloud Stream binder from scratch.

· Java Zone ·
Free Resource
Learn how to develop a custom Spring Cloud Stream binder from scratch.

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that is designed to build event-driven microservices communicating via one or more shared messaging systems.

The core Spring Cloud Stream component is called Binder,” a crucial abstraction that’s already been implemented for the most common messaging systems (e.g. Apache Kafka, Kafka Streams, Google PubSub, RabbitMQ, Azure EventHub, and Azure ServiceBus).

You may also like: Spring Cloud Stream With Kafka

In this article, we’ll see in detail how to develop a custom Spring Cloud Stream binder from scratch.

Introduction

The official Spring Cloud Stream documentation already provides a very basic explanation of how to implement your own Spring Cloud Stream binder.

Here’s a brief excerpt from it about the Binder Service Provider Interface that must be implemented in order to create a custom binder:

The Binder SPI consists of a number of interfaces, out-of-the box utility classes, and discovery strategies that provide a pluggable mechanism for connecting to external middleware. The key point of the SPI is the Binder interface, which is a strategy for connecting inputs and outputs to external middleware. The following listing shows the definition of the Binder interface:
Java




x


1
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {    
2
  Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
3
  Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
4
}



And here’s one more documentation snippet that’s basically a micro-tutorial about developing Spring Cloud Stream binders:

A typical binder implementation consists of the following: a class that implements the Binder interface; a Spring @Configuration class that creates a bean of type Binder along with the middleware connection infrastructure; a META-INF/spring.binders file found on the classpath containing one or more binder definitions, as shown in the following example:
Java




xxxxxxxxxx
1


1
kafka:\ 
2
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration



Even though the above documentation is quite helpful to get started, it would definitely help to have a more thoroughly detailed guide and a practical example go along with it.

TL;DR: Just Gimme the Code

If you don't have the time to go through this detailed tutorial, you can jump to my demo on GitHub, which includes a custom file-based Spring Cloud Stream binder like the one shown below and a sample application that depends on it.

Developing the Custom Binder

Let’s get our hands dirty by developing a custom Spring Cloud Stream binder that consumes events by reading files and produces events by writing to files!

Create a new Maven project with a pom.xml file similar to the following, which includes the dependency to Spring Cloud Stream:

XML




xxxxxxxxxx
1
71


 
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5
<modelVersion>4.0.0</modelVersion>
6
<groupId>com.example</groupId>
7
<artifactId>spring-cloud-stream-custom-binder</artifactId>
8
<version>0.0.1-SNAPSHOT</version>
9
<name>spring-cloud-stream-custom-binder</name>
10
<description>A demo custom Spring Cloud Stream Binder</description>
11
 
          
12
<properties>
13
<java.version>1.8</java.version>
14
<maven.compiler.source>1.8</maven.compiler.source>
15
<maven.compiler.target>1.8</maven.compiler.target>
16
<spring-cloud.version>Hoxton.RC1</spring-cloud.version>
17
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>
18
</properties>
19
 
          
20
<dependencies>
21
<dependency>
22
<groupId>org.springframework.cloud</groupId>
23
<artifactId>spring-cloud-stream</artifactId>
24
</dependency>
25
 
          
26
<dependency>
27
<groupId>org.springframework.boot</groupId>
28
<artifactId>spring-boot-starter-test</artifactId>
29
<scope>test</scope>
30
<exclusions>
31
<exclusion>
32
<groupId>org.junit.vintage</groupId>
33
<artifactId>junit-vintage-engine</artifactId>
34
</exclusion>
35
</exclusions>
36
</dependency>
37
<dependency>
38
<groupId>org.springframework.cloud</groupId>
39
<artifactId>spring-cloud-stream-test-support</artifactId>
40
<scope>test</scope>
41
</dependency>
42
</dependencies>
43
 
          
44
<dependencyManagement>
45
<dependencies>
46
<dependency>
47
<groupId>org.springframework.boot</groupId>
48
<artifactId>spring-boot-dependencies</artifactId>
49
<version>${spring-boot.version}</version>
50
<type>pom</type>
51
<scope>import</scope>
52
</dependency>
53
<dependency>
54
<groupId>org.springframework.cloud</groupId>
55
<artifactId>spring-cloud-dependencies</artifactId>
56
<version>${spring-cloud.version}</version>
57
<type>pom</type>
58
<scope>import</scope>
59
</dependency>
60
</dependencies>
61
</dependencyManagement>
62
 
          
63
<repositories>
64
<repository>
65
<id>spring-milestones</id>
66
<name>Spring Milestones</name>
67
<url>https://repo.spring.io/milestone</url>
68
</repository>
69
</repositories>
70
 
          
71
</project>



Technically, we just need to provide our org.springframework.cloud.stream.binder.Binder implementation, but practically, the binder depends on two more components that we need to provide first: the ProvisioningProvider and the MessageProducer.

The ProvisioningProvider is responsible for the provisioning of consumer and producer destinations, and it is particularly useful to convert the logical destinations included in the application.yml or application.properties file in physical destination references (you could look Spring beans up by destination name, or simply trim the destination name as we do in the following snippet):

Java




xxxxxxxxxx
1
48


1
package com.example.springcloudstreamcustombinder.provisioners;
2
 
          
3
import org.springframework.cloud.stream.binder.ConsumerProperties;
4
import org.springframework.cloud.stream.binder.ProducerProperties;
5
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
6
import org.springframework.cloud.stream.provisioning.ProducerDestination;
7
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
8
 
          
9
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
10
 
          
11
    @Override
12
    public ProducerDestination provisionProducerDestination(
13
            final String name,
14
            final ProducerProperties properties) {
15
 
          
16
        return new FileMessageDestination(name);
17
    }
18
 
          
19
    @Override
20
    public ConsumerDestination provisionConsumerDestination(
21
            final String name,
22
            final String group,
23
            final ConsumerProperties properties) {
24
 
          
25
        return new FileMessageDestination(name);
26
    }
27
 
          
28
    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
29
 
          
30
        private final String destination;
31
 
          
32
        private FileMessageDestination(final String destination) {
33
            this.destination = destination;
34
        }
35
 
          
36
        @Override
37
        public String getName() {
38
            return destination.trim();
39
        }
40
 
          
41
        @Override
42
        public String getNameForPartition(int partition) {
43
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
44
        }
45
 
          
46
    }
47
 
          
48
}



The MessageProducer  — unlike the name suggests — is responsible for consuming the events and handling them as messages to the client application that is configured to consume such events.

Here is an example of a MessageProducer  implementation that polls on a file that matches the trimmed destination name and is located in the project path, while also archiving read messages and discarding consequent identical messages:

Java




xxxxxxxxxx
1
73


 
1
package com.example.springcloudstreamcustombinder.producers;
2
 
          
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Paths;
6
import java.util.List;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.ScheduledExecutorService;
9
 
          
10
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
11
import org.springframework.integration.endpoint.MessageProducerSupport;
12
import org.springframework.integration.support.MessageBuilder;
13
import org.springframework.messaging.Message;
14
 
          
15
import static java.nio.file.StandardOpenOption.*;
16
import static java.util.concurrent.TimeUnit.*;
17
 
          
18
public class FileMessageProducer extends MessageProducerSupport {
19
 
          
20
    public static final String ARCHIVE = "archive.txt";
21
    private final ConsumerDestination destination;
22
    private String previousPayload;
23
 
          
24
    public FileMessageProducer(ConsumerDestination destination) {
25
        this.destination = destination;
26
    }
27
 
          
28
    @Override
29
    public void doStart() {
30
        receive();
31
    }
32
 
          
33
    private void receive() {
34
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
35
 
          
36
        executorService.scheduleWithFixedDelay(() -> {
37
            String payload = getPayload();
38
 
          
39
            if(payload != null) {
40
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
41
                archiveMessage(payload);
42
                sendMessage(receivedMessage);
43
            }
44
 
          
45
        }, 0, 50, MILLISECONDS);            
46
    }
47
 
          
48
    private String getPayload() {
49
        try {
50
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
51
            String currentPayload = allLines.get(allLines.size() - 1);
52
 
          
53
            if(!currentPayload.equals(previousPayload)) {
54
                previousPayload = currentPayload;
55
                return currentPayload;
56
            }
57
        } catch (IOException e) {
58
            throw new RuntimeException(e);
59
        }
60
 
          
61
        return null;
62
    }
63
 
          
64
    private void archiveMessage(String payload) {
65
        try {
66
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
67
        } catch (IOException e) {
68
            throw new RuntimeException(e);
69
        }
70
    }
71
 
          
72
}
73
 
          



Finally, with all the required components in place, we can implement our own Binder by extending the AbstractMessageChannelBinder class, providing the required constructors and overriding the inherited abstract methods:

Java




xxxxxxxxxx
1
57


 
1
package com.example.springcloudstreamcustombinder;
2
 
          
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Paths;
6
 
          
7
import com.example.springcloudstreamcustombinder.producers.FileMessageProducer;
8
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
9
 
          
10
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
11
import org.springframework.cloud.stream.binder.ConsumerProperties;
12
import org.springframework.cloud.stream.binder.ProducerProperties;
13
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
14
import org.springframework.cloud.stream.provisioning.ProducerDestination;
15
import org.springframework.integration.core.MessageProducer;
16
import org.springframework.messaging.MessageChannel;
17
import org.springframework.messaging.MessageHandler;
18
 
          
19
import static java.nio.file.StandardOpenOption.*;
20
 
          
21
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
22
 
          
23
    public FileMessageBinder(
24
            String[] headersToEmbed,
25
            FileMessageBinderProvisioner provisioningProvider) {
26
 
          
27
        super(headersToEmbed, provisioningProvider);
28
    }
29
 
          
30
    @Override
31
    protected MessageHandler createProducerMessageHandler(
32
            final ProducerDestination destination,
33
            final ProducerProperties producerProperties,
34
            final MessageChannel errorChannel) throws Exception {
35
 
          
36
        return message -> {
37
            String fileName = destination.getName();
38
            String payload = new String((byte[])message.getPayload()) + "\n";
39
 
          
40
            try {
41
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
42
            } catch (IOException e) {
43
                throw new RuntimeException(e);
44
            }
45
        };
46
    }
47
 
          
48
    @Override
49
    protected MessageProducer createConsumerEndpoint(
50
            final ConsumerDestination destination,
51
            final String group,
52
            final ConsumerProperties properties) throws Exception {
53
 
          
54
        return new FileMessageProducer(destination);
55
    }
56
 
          
57
}



Last but not least, we need to provide the Spring Configuration for our binder as follows:

Java




xxxxxxxxxx
1
25


 
1
package com.example.springcloudstreamcustombinder.config;
2
 
          
3
import com.example.springcloudstreamcustombinder.FileMessageBinder;
4
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
5
 
          
6
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
7
import org.springframework.context.annotation.Bean;
8
import org.springframework.context.annotation.Configuration;
9
 
          
10
@Configuration
11
public class FileMessageBinderConfiguration {
12
 
          
13
    @Bean
14
    @ConditionalOnMissingBean
15
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
16
        return new FileMessageBinderProvisioner();
17
    }
18
 
          
19
    @Bean
20
    @ConditionalOnMissingBean
21
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
22
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
23
    }
24
 
          
25
}



And the related src/main/resources/META-INF/spring.binders file with the binder name followed by the qualified name of the binder’s Spring Configuration:

Java




xxxxxxxxxx
1


1
myFileBinder:\ 
2
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration



Congratulations! Your custom binder implementation is now complete and can be installed in your local Maven repository by running mvn clean install

Testing the Custom Binder

Head over to start.spring.io and generate a Spring Boot 2.2.0 project with Cloud Stream as the only required dependency (or just click on this link instead, and generate the project from there).

Add your custom binder dependency to the pom.xml file, in the dependencies section:

XML




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>com.example</groupId>
3
    <artifactId>spring-cloud-stream-custom-binder</artifactId>
4
    <version>0.0.1-SNAPSHOT</version>
5
</dependency>



Replace the src/main/resources/application.properties file with the following application.yml file, which enables logging for all events that are managed by Spring Cloud Stream:

Java




xxxxxxxxxx
1


 
1
logging:
2
    level:
3
        org.springframework.cloud.stream.messaging.DirectWithAttributesChannel: DEBUG



Replace the src/main/java/SpringCloudStreamCustomBinderDemo.java file contents with the following:

Java
xxxxxxxxxx
1
15
1
@SpringBootApplication
2
@EnableBinding({Sink.class, Source.class})
3
public class SpringCloudStreamCustomBinderDemoApplication {
4
 
          
5
    public static void main(String[] args) {
6
        SpringApplication.run(SpringCloudStreamCustomBinderDemoApplication.class, args);
7
    }
8
 
          
9
    @StreamListener(Sink.INPUT)
10
    @SendTo(Source.OUTPUT)
11
    public String handle(String message) {
12
        return String.format("Received: %s", message);
13
    }
14
 
          
15
}


Finally, add a file named “input” to the main project directory and write something to it.

With this configuration in place, you can now launch the application by running mvn spring-boot:run, relying on the custom Spring Cloud Stream binder we just implemented to: keep consuming events from the “input” file you just created; write the processing results to an output file named “output”; and keep track of all previously read messages in the “archive.txt” file.

Conclusion

The official Spring Cloud Stream reference documentation is quite helpful when it comes down to implementing your own binder but it’s definitely not enough.

That being held, creating your own binder implementation is almost effortless even though it might appear like a hard task at first; also, knowing how to build a custom Spring Cloud Stream binder makes for a quite niche and useful skill in the development field!

Moreover, knowing how Spring Cloud Stream binders work makes it easy to customize already existing ones, and not just building new binders (which, in all honesty, might be a less common scenario given most use cases).

References & Useful Resources

If you liked what you just read, please consider following me on Medium and Twitter for more articles like this!

Further Reading

Spring Cloud Stream With Kafka

Getting Started With Spring Cloud Stream

Building and Testing Message-Driven Microservices Using Spring Cloud Stream

Topics:
java ,spring ,spring boot ,spring cloud stream ,events ,microservices ,microservice communication ,kafka ,spring tips ,binders

Published at DZone with permission of Domenico Sibilio . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}