DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Spring Cloud Stream: A Brief Guide
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Component Tests for Spring Cloud Microservices

Trending

  • Security by Design: Building Full-Stack Applications With DevSecOps
  • Designing Fault-Tolerant Messaging Workflows Using State Machine Architecture
  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • Exploring Intercooler.js: Simplify AJAX With HTML Attributes
  1. DZone
  2. Coding
  3. Java
  4. Spring Cloud Stream Channel Interceptor

Spring Cloud Stream Channel Interceptor

A Channel Interceptor is used to capture a message before being sent or received in order to view or modify it. Learn how a channel interceptor works and how to use it.

By 
Mohammed ZAHID user avatar
Mohammed ZAHID
DZone Core CORE ·
May. 05, 21 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
14.5K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

A Channel Interceptor is a means to capture a message before being sent or received in order to view it or modify it. The channel interceptor allows having a structured code when we want to add extra message processing or embed additional data that are basically related to a technical aspect without affecting the business code.

The Message Interceptor is used in frameworks like Spring Cloud Sleuth and Spring Security to propagate tracing and security context through message queue by adding headers to message in the producer part, then reading them and restoring the context in the consumer part.

The message interceptor plays a similar role like a Servlet filter or an Aspect that can be added in a transparent way to both message producer and consumer as the following diagram shows:

 Message Broker Flow

  1. When the output channel sends a new message, the message goes through the channel interceptors before being delivered to the broker.
  2. When a new message received by the consumer, the message goes through the input channel interceptors before being handled by the message listener.

How It Works

This part explains roughly how Spring Cloud Stream works when sending and receiving a message. The descriptions below do not include the binders of specific broker implementations such as Kafka or RabbitMQ.

Bellow the part of ChannelInterceptor interface source code that interests us:

Java
 




x


 
1
public interface ChannelInterceptor {
2

          
3
    /**
4
     * Invoked before the Message is actually sent to the channel.
5
     * This allows for modification of the Message if necessary.
6
     * If this method returns {@code null} then the actual
7
     * send invocation will not occur.
8
     */
9
    @Nullable
10
    default Message<?> preSend(Message<?> message, MessageChannel channel) {
11
        return message;
12
    }
13

          
14
    /**
15
     * Invoked immediately after the send invocation. The boolean
16
     * value argument represents the return value of that invocation.
17
     */
18
    default void postSend(Message<?> message, MessageChannel channel, boolean sent) {
19
    }
20

          
21
    /**
22
     * Invoked after the completion of a send regardless of any exception that
23
     * have been raised thus allowing for proper resource cleanup.
24
     * <p>Note that this will be invoked only if {@link #preSend} successfully
25
     * completed and returned a Message, i.e. it did not return {@code null}.
26
     * @since 4.1
27
     */
28
    default void afterSendCompletion(
29
            Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
30
    }
31
  
32
    ...
33
}



Sending Message

  1. When sending a message, the MessageChannel calls the method MessageConverter.fromMassage() in order to perform message conversion.
  2. The MessageChannel calls the method ChannelInterceptor.preSend() using the message object returned from step 1. This method returns also the message object.
  3. The MessageChannel call the broker API implementation in order to send the returned message from step 2.
  4. The MessageChannel calls the methods ChannelInterceptor.postSend().
  5. Finally, The MessageChannel calls the method ChannelInterceptor.afterSendCompletion() with the exception type if thrown during the message sending.

Messaging Sending to Broker via Interceptor

Receiving Message

The message reception follows similar processing as the message sending when using the annotation @StreamListener instead of pulling messages. When a new message received, the SubscribableChannel calls also method ChannelInterceptor.preSend().

Keep in mind that ChannelInterceptor.preReceive() is invoked only when using PollableChannel and not StreamListener.

Create Your Own Channel Interceptor

In this example, we will create two global channel interceptors for both producer and consumer. The producer interceptor will add a new header to the original message and the received interceptor will take it and log it. This example can be applied to Spring Security context, MDC or any other information you want to propagate through a broker.

Project Preparation

If you are familiar with Spring Cloud Stream you can build your own test project, if not you can follow the steps described in this post Spring Cloud Stream With Kafka.

Output Channel Interceptor

Create a simple class that implements ChannelInterceptor. This class override the method preSend() by adding a new header to the message:

Java
 




xxxxxxxxxx
1
11


 
1
@Service
2
public class OutputChannelInterceptor implements ChannelInterceptor {
3

          
4
    @Override
5
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
6
        MessageHeaderAccessor mutableAccessor = MessageHeaderAccessor.getMutableAccessor(message);
7
        // Adding the message creation time header
8
        mutableAccessor.setHeader("Creation-Time", System.currentTimeMillis());
9
        return new GenericMessage<>(message.getPayload(), mutableAccessor.getMessageHeaders());
10
    }
11

          
12
}


Input Channel Interceptor

Create a simple class that implements ChannelInterceptor. This class overrides the method preSend() by retrieving the header injected in the previous class and logging it.

Java
 




xxxxxxxxxx
1
14


 
1
@Slf4j // lombok annotation for log
2
@Service
3
public class InputChannelInterceptor implements ChannelInterceptor {
4

          
5
    @Override
6
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
7
        MessageHeaders messageHeaders=message.getHeaders();
8
        if(messageHeaders.containsKey("Creation-Time")){ // Checking that the header exists
9
            // Retreiving and logging the message creation time value
10
            Long creationTime= (Long) messageHeaders.get("Creation-Time");
11
            log.debug("The message creation time is: {}",creationTime);
12
        }
13
        return message;
14
    }
15
}


Interceptors Configuration

The interceptor configuration can be done in two ways.

Using Global Channel Interceptor

The first way consists of adding the annotation @GlobalChannelIterceptor to the input and output interceptors by specifying the pattern for each one.

All the input channel names should have the same prefix or suffix, the same for output ones.

Java
 




xxxxxxxxxx
1


 
1

          
2
@Service
3
@GlobalChannelInterceptor(pattern={"*-out"})// output channel name is XXX-out
4
public class OutputChannelInterceptor implements ChannelInterceptor {
5
      ....
6
}



Java
 




xxxxxxxxxx
1


 
1
@Slf4j
2
@Service
3
@GlobalChannelInterceptor(pattern={"*-in"})// input channel name is XXX-in
4
public class InputChannelInterceptor implements ChannelInterceptor {
5
      ....
6
}



Using Bean Post Processor

The second way consists of creating a BeanPostProcessor class that injects the channel interceptors in each Message Channel according to its type.

Creates a configuration class that injects the interceptors in all message channels.

Java
 




x


 
1
@Configuration
2
public class MessagingConfiguration {
3

          
4
    @Bean
5
    public BeanPostProcessor channelsConfigurer(
6
      @Autowired inputChannelInterceptor inputChannelInterceptor,
7
      @Autowired OutputChannelInterceptor outputChannelInterceptor) {
8
        return new BeanPostProcessor() {
9

          
10
            @Override
11
            public Object postProcessBeforeInitialization(Object bean, String beanName) {
12
                return bean;
13
            }
14

          
15
            @Override
16
            public Object postProcessAfterInitialization(Object bean, String beanName) {
17
                if (bean instanceof AbstractMessageChannel) {
18
                    AbstractMessageChannel messageChannel = (AbstractMessageChannel) bean;
19
                    if (messageChannel.getAttribute("type").equals("input")) {
20
                      //The current bean is an input message channel
21
                        messageChannel.addInterceptor(inputChannelInterceptor);
22
                    } else {
23
                      //The current bean is an output message channel
24
                        messageChannel.addInterceptor(outputChannelInterceptor);
25
                    }
26
                }
27
                return bean;
28
            }
29

          
30
        };
31
    }
32
}



Spring Cloud Spring Framework Stream (computing) Spring Security kafka

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Spring Cloud Stream: A Brief Guide
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Component Tests for Spring Cloud Microservices

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!