DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Spring Cloud Stream Channel Interceptor
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Trending

  • Prioritizing Cloud Security Risks: A Developer's Guide to Tackling Security Debt
  • Strategies for Securing E-Commerce Applications
  • Data Lake vs. Warehouse vs. Lakehouse vs. Mart: Choosing the Right Architecture for Your Business
  • Designing AI Multi-Agent Systems in Java
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. How to Create a Spring Cloud Stream Binder From Scratch

How to Create a Spring Cloud Stream Binder From Scratch

Learn how to develop a custom Spring Cloud Stream binder from scratch.

By 
Domenico Sibilio user avatar
Domenico Sibilio
·
Dec. 20, 19 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
79.8K Views

Join the DZone community and get the full member experience.

Join For Free
Learn how to develop a custom Spring Cloud Stream binder from scratch.

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that is designed to build event-driven microservices communicating via one or more shared messaging systems.

The core Spring Cloud Stream component is called “Binder,” a crucial abstraction that’s already been implemented for the most common messaging systems (e.g. Apache Kafka, Kafka Streams, Google PubSub, RabbitMQ, Azure EventHub, and Azure ServiceBus).


You may also like: Spring Cloud Stream With Kafka

In this article, we’ll see in detail how to develop a custom Spring Cloud Stream binder from scratch.

Introduction

The official Spring Cloud Stream documentation already provides a very basic explanation of how to implement your own Spring Cloud Stream binder.

Here’s a brief excerpt from it about the Binder Service Provider Interface that must be implemented in order to create a custom binder:

The Binder SPI consists of a number of interfaces, out-of-the box utility classes, and discovery strategies that provide a pluggable mechanism for connecting to external middleware. The key point of the SPI is the Binder interface, which is a strategy for connecting inputs and outputs to external middleware. The following listing shows the definition of the Binder interface:
Java
 




x


 
1
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {    
2
  Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
3
  Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
4
}



And here’s one more documentation snippet that’s basically a micro-tutorial about developing Spring Cloud Stream binders:

A typical binder implementation consists of the following: a class that implements the Binder interface; a Spring @Configuration class that creates a bean of type Binder along with the middleware connection infrastructure; a META-INF/spring.binders file found on the classpath containing one or more binder definitions, as shown in the following example:
Java
 




xxxxxxxxxx
1


 
1
kafka:\ 
2
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration



Even though the above documentation is quite helpful to get started, it would definitely help to have a more thoroughly detailed guide and a practical example go along with it.

TL;DR: Just Gimme the Code

If you don't have the time to go through this detailed tutorial, you can jump to my demo on GitHub, which includes a custom file-based Spring Cloud Stream binder like the one shown below and a sample application that depends on it.

Developing the Custom Binder

Let’s get our hands dirty by developing a custom Spring Cloud Stream binder that consumes events by reading files and produces events by writing to files!

Create a new Maven project with a pom.xml file similar to the following, which includes the dependency to Spring Cloud Stream:


XML
 




xxxxxxxxxx
1
71


 
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5
<modelVersion>4.0.0</modelVersion>
6
<groupId>com.example</groupId>
7
<artifactId>spring-cloud-stream-custom-binder</artifactId>
8
<version>0.0.1-SNAPSHOT</version>
9
<name>spring-cloud-stream-custom-binder</name>
10
<description>A demo custom Spring Cloud Stream Binder</description>
11
 
          
12
<properties>
13
<java.version>1.8</java.version>
14
<maven.compiler.source>1.8</maven.compiler.source>
15
<maven.compiler.target>1.8</maven.compiler.target>
16
<spring-cloud.version>Hoxton.RC1</spring-cloud.version>
17
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>
18
</properties>
19
 
          
20
<dependencies>
21
<dependency>
22
<groupId>org.springframework.cloud</groupId>
23
<artifactId>spring-cloud-stream</artifactId>
24
</dependency>
25
 
          
26
<dependency>
27
<groupId>org.springframework.boot</groupId>
28
<artifactId>spring-boot-starter-test</artifactId>
29
<scope>test</scope>
30
<exclusions>
31
<exclusion>
32
<groupId>org.junit.vintage</groupId>
33
<artifactId>junit-vintage-engine</artifactId>
34
</exclusion>
35
</exclusions>
36
</dependency>
37
<dependency>
38
<groupId>org.springframework.cloud</groupId>
39
<artifactId>spring-cloud-stream-test-support</artifactId>
40
<scope>test</scope>
41
</dependency>
42
</dependencies>
43
 
          
44
<dependencyManagement>
45
<dependencies>
46
<dependency>
47
<groupId>org.springframework.boot</groupId>
48
<artifactId>spring-boot-dependencies</artifactId>
49
<version>${spring-boot.version}</version>
50
<type>pom</type>
51
<scope>import</scope>
52
</dependency>
53
<dependency>
54
<groupId>org.springframework.cloud</groupId>
55
<artifactId>spring-cloud-dependencies</artifactId>
56
<version>${spring-cloud.version}</version>
57
<type>pom</type>
58
<scope>import</scope>
59
</dependency>
60
</dependencies>
61
</dependencyManagement>
62
 
          
63
<repositories>
64
<repository>
65
<id>spring-milestones</id>
66
<name>Spring Milestones</name>
67
<url>https://repo.spring.io/milestone</url>
68
</repository>
69
</repositories>
70
 
          
71
</project>



Technically, we just need to provide our org.springframework.cloud.stream.binder.Binder implementation, but practically, the binder depends on two more components that we need to provide first: the ProvisioningProvider and the MessageProducer.

The ProvisioningProvider is responsible for the provisioning of consumer and producer destinations, and it is particularly useful to convert the logical destinations included in the application.yml or application.properties file in physical destination references (you could look Spring beans up by destination name, or simply trim the destination name as we do in the following snippet):

Java
 




xxxxxxxxxx
1
48


 
1
package com.example.springcloudstreamcustombinder.provisioners;
2
 
          
3
import org.springframework.cloud.stream.binder.ConsumerProperties;
4
import org.springframework.cloud.stream.binder.ProducerProperties;
5
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
6
import org.springframework.cloud.stream.provisioning.ProducerDestination;
7
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
8
 
          
9
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
10
 
          
11
    @Override
12
    public ProducerDestination provisionProducerDestination(
13
            final String name,
14
            final ProducerProperties properties) {
15
 
          
16
        return new FileMessageDestination(name);
17
    }
18
 
          
19
    @Override
20
    public ConsumerDestination provisionConsumerDestination(
21
            final String name,
22
            final String group,
23
            final ConsumerProperties properties) {
24
 
          
25
        return new FileMessageDestination(name);
26
    }
27
 
          
28
    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
29
 
          
30
        private final String destination;
31
 
          
32
        private FileMessageDestination(final String destination) {
33
            this.destination = destination;
34
        }
35
 
          
36
        @Override
37
        public String getName() {
38
            return destination.trim();
39
        }
40
 
          
41
        @Override
42
        public String getNameForPartition(int partition) {
43
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
44
        }
45
 
          
46
    }
47
 
          
48
}



The MessageProducer  — unlike the name suggests — is responsible for consuming the events and handling them as messages to the client application that is configured to consume such events.

Here is an example of a MessageProducer  implementation that polls on a file that matches the trimmed destination name and is located in the project path, while also archiving read messages and discarding consequent identical messages:

Java
 




xxxxxxxxxx
1
73


 
1
package com.example.springcloudstreamcustombinder.producers;
2
 
          
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Paths;
6
import java.util.List;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.ScheduledExecutorService;
9
 
          
10
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
11
import org.springframework.integration.endpoint.MessageProducerSupport;
12
import org.springframework.integration.support.MessageBuilder;
13
import org.springframework.messaging.Message;
14
 
          
15
import static java.nio.file.StandardOpenOption.*;
16
import static java.util.concurrent.TimeUnit.*;
17
 
          
18
public class FileMessageProducer extends MessageProducerSupport {
19
 
          
20
    public static final String ARCHIVE = "archive.txt";
21
    private final ConsumerDestination destination;
22
    private String previousPayload;
23
 
          
24
    public FileMessageProducer(ConsumerDestination destination) {
25
        this.destination = destination;
26
    }
27
 
          
28
    @Override
29
    public void doStart() {
30
        receive();
31
    }
32
 
          
33
    private void receive() {
34
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
35
 
          
36
        executorService.scheduleWithFixedDelay(() -> {
37
            String payload = getPayload();
38
 
          
39
            if(payload != null) {
40
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
41
                archiveMessage(payload);
42
                sendMessage(receivedMessage);
43
            }
44
 
          
45
        }, 0, 50, MILLISECONDS);            
46
    }
47
 
          
48
    private String getPayload() {
49
        try {
50
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
51
            String currentPayload = allLines.get(allLines.size() - 1);
52
 
          
53
            if(!currentPayload.equals(previousPayload)) {
54
                previousPayload = currentPayload;
55
                return currentPayload;
56
            }
57
        } catch (IOException e) {
58
            throw new RuntimeException(e);
59
        }
60
 
          
61
        return null;
62
    }
63
 
          
64
    private void archiveMessage(String payload) {
65
        try {
66
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
67
        } catch (IOException e) {
68
            throw new RuntimeException(e);
69
        }
70
    }
71
 
          
72
}
73
 
          



Finally, with all the required components in place, we can implement our own Binder by extending the AbstractMessageChannelBinder class, providing the required constructors and overriding the inherited abstract methods:

Java
 




xxxxxxxxxx
1
57


 
1
package com.example.springcloudstreamcustombinder;
2
 
          
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Paths;
6
 
          
7
import com.example.springcloudstreamcustombinder.producers.FileMessageProducer;
8
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
9
 
          
10
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
11
import org.springframework.cloud.stream.binder.ConsumerProperties;
12
import org.springframework.cloud.stream.binder.ProducerProperties;
13
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
14
import org.springframework.cloud.stream.provisioning.ProducerDestination;
15
import org.springframework.integration.core.MessageProducer;
16
import org.springframework.messaging.MessageChannel;
17
import org.springframework.messaging.MessageHandler;
18
 
          
19
import static java.nio.file.StandardOpenOption.*;
20
 
          
21
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
22
 
          
23
    public FileMessageBinder(
24
            String[] headersToEmbed,
25
            FileMessageBinderProvisioner provisioningProvider) {
26
 
          
27
        super(headersToEmbed, provisioningProvider);
28
    }
29
 
          
30
    @Override
31
    protected MessageHandler createProducerMessageHandler(
32
            final ProducerDestination destination,
33
            final ProducerProperties producerProperties,
34
            final MessageChannel errorChannel) throws Exception {
35
 
          
36
        return message -> {
37
            String fileName = destination.getName();
38
            String payload = new String((byte[])message.getPayload()) + "\n";
39
 
          
40
            try {
41
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
42
            } catch (IOException e) {
43
                throw new RuntimeException(e);
44
            }
45
        };
46
    }
47
 
          
48
    @Override
49
    protected MessageProducer createConsumerEndpoint(
50
            final ConsumerDestination destination,
51
            final String group,
52
            final ConsumerProperties properties) throws Exception {
53
 
          
54
        return new FileMessageProducer(destination);
55
    }
56
 
          
57
}



Last but not least, we need to provide the Spring Configuration for our binder as follows:

Java
 




xxxxxxxxxx
1
25


 
1
package com.example.springcloudstreamcustombinder.config;
2
 
          
3
import com.example.springcloudstreamcustombinder.FileMessageBinder;
4
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
5
 
          
6
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
7
import org.springframework.context.annotation.Bean;
8
import org.springframework.context.annotation.Configuration;
9
 
          
10
@Configuration
11
public class FileMessageBinderConfiguration {
12
 
          
13
    @Bean
14
    @ConditionalOnMissingBean
15
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
16
        return new FileMessageBinderProvisioner();
17
    }
18
 
          
19
    @Bean
20
    @ConditionalOnMissingBean
21
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
22
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
23
    }
24
 
          
25
}



And the related src/main/resources/META-INF/spring.binders file with the binder name followed by the qualified name of the binder’s Spring Configuration:


Java
 




xxxxxxxxxx
1


 
1
myFileBinder:\ 
2
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration



Congratulations! Your custom binder implementation is now complete and can be installed in your local Maven repository by running mvn clean install

Testing the Custom Binder

Head over to start.spring.io and generate a Spring Boot 2.2.0 project with Cloud Stream as the only required dependency (or just click on this link instead, and generate the project from there).

Add your custom binder dependency to the pom.xml file, in the dependencies section:


XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>com.example</groupId>
3
    <artifactId>spring-cloud-stream-custom-binder</artifactId>
4
    <version>0.0.1-SNAPSHOT</version>
5
</dependency>



Replace the src/main/resources/application.properties file with the following application.yml file, which enables logging for all events that are managed by Spring Cloud Stream:


Java
 




xxxxxxxxxx
1


 
1
logging:
2
    level:
3
        org.springframework.cloud.stream.messaging.DirectWithAttributesChannel: DEBUG



Replace the src/main/java/SpringCloudStreamCustomBinderDemo.java file contents with the following:


Java
 




xxxxxxxxxx
1
15


 
1
@SpringBootApplication
2
@EnableBinding({Sink.class, Source.class})
3
public class SpringCloudStreamCustomBinderDemoApplication {
4
 
          
5
    public static void main(String[] args) {
6
        SpringApplication.run(SpringCloudStreamCustomBinderDemoApplication.class, args);
7
    }
8
 
          
9
    @StreamListener(Sink.INPUT)
10
    @SendTo(Source.OUTPUT)
11
    public String handle(String message) {
12
        return String.format("Received: %s", message);
13
    }
14
 
          
15
}



Finally, add a file named “input” to the main project directory and write something to it.

With this configuration in place, you can now launch the application by running mvn spring-boot:run, relying on the custom Spring Cloud Stream binder we just implemented to: keep consuming events from the “input” file you just created; write the processing results to an output file named “output”; and keep track of all previously read messages in the “archive.txt” file.

Conclusion

The official Spring Cloud Stream reference documentation is quite helpful when it comes down to implementing your own binder but it’s definitely not enough.

That being held, creating your own binder implementation is almost effortless even though it might appear like a hard task at first; also, knowing how to build a custom Spring Cloud Stream binder makes for a quite niche and useful skill in the development field!

Moreover, knowing how Spring Cloud Stream binders work makes it easy to customize already existing ones, and not just building new binders (which, in all honesty, might be a less common scenario given most use cases).

References & Useful Resources

  • The latest Spring Cloud Stream documentation
  • My demo custom Spring Cloud Stream binder on GitHub

If you liked what you just read, please consider following me on Medium and Twitter for more articles like this!

Further Reading

Spring Cloud Stream With Kafka

Building and Testing Message-Driven Microservices Using Spring Cloud Stream

Spring Framework Spring Cloud Stream (computing) Scratch (programming language)

Published at DZone with permission of Domenico Sibilio. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream Channel Interceptor
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!