DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Be Punctual! Avoiding Kotlin’s lateinit In Spring Boot Testing
  • Spring Boot: How To Use Java Persistence Query Language (JPQL)
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • Java, Spring Boot, and MongoDB: Performance Analysis and Improvements

Trending

  • You Are Using Claude Wrong (And So Is Everyone You Know)
  • Building AI-Powered Java Applications With Jakarta EE and LangChain4j
  • Getting Started With Agentic Workflows in Java and Quarkus
  • Multi-Scale Feature Learning in CNN and U-Net Architectures
  1. DZone
  2. Coding
  3. Frameworks
  4. Spring Tips: Apache RocketMQ

Spring Tips: Apache RocketMQ

By 
Josh Long user avatar
Josh Long
·
Apr. 13, 20 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
5.8K Views

Join the DZone community and get the full member experience.

Join For Free

Hi, Spring fans! In this installment of Spring Tips, we’re going to look at Alibaba’s Apache RocketMQ. We’ve talked some about Alibaba in Spring Tips before. Check out the earlier Spring Tips installment in which we explore some of Spring Cloud Alibaba.

Running Apache RocketMQ

In order to use Apache RocketMQ, you’ll need to follow the steps in the RocketMQ quickstart. This Spring Tips installment introduces Apache RocketMQ, originally a technology developed and used internally at Alibaba and proven in the forge of 11/11, the famous Chinese sales holiday, sort of like “Cyber Monday,” or “Black Friday,” in the US. Sort of like that, but waaaaaay bigger. In 2019, Alibaba (alone, with no other e-commerce engines involved), made almost $40 billion USD in 24 hours. This required that trillions of messages be sent through something that could scale to meet the demand. RocketMQ is the only thing they could trust.

You’ll need to use Java 8 when running Apache RocketMQ. (You can use any version of Java when writing Spring applications that connect to Apache RocketMQ, of course.) I use SDK Manager (“SDKman” - sdk) to switch to the appropriate version of Java.

Shell
 




xxxxxxxxxx
1


 
1
sdk use java 8.0.242.hs-adpt



That’ll install a version that works if it’s not already installed. Once that’s done, you’ll then need to run the NameServer.

Shell
 




xxxxxxxxxx
1


 
1
${ROCKETMQ_HOME}/bin/mqnamesrv 



Then you’ll need to run the Broker itself.

Shell
 




xxxxxxxxxx
1


 
1
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876



If you want to use SQL-based filtering, you need to add a property to the broker’s configuration, $ROCKETMQ_HOME/conf/broker.conf, and then tell RocketMQ to use that configuration.

Properties files
 




xxxxxxxxxx
1


 
1
enablePropertyFilter = true


I use a script like this to launch everything.

Shell
 




xxxxxxxxxx
1


 
1
export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
2
${ROCKETMQ_HOME}/bin/mqnamesrv &  
3
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf


Using Apache RocketMQ from Java Code

Let’s look at a simple producer class that uses the Spring Boot autoconfiguration and the RocketMQTemplate.

In order to work with this, you’ll need to create a new project on the Spring Initializr. I generated a new project with the latest version of Java and then I made sure to include Lombok. We also need the Apache RocketMQ client and the appropriate Spring Boot autoconfiguration:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>org.apache.rocketmq</groupId>
3
    <artifactId>rocketmq-spring-boot-starter</artifactId>
4
    <version>2.0.4</version>
5
</dependency>



The autoconfiguration will create a connection to the running Apache RocketMQ broker, informed by certain properties.

Properties files
 




xxxxxxxxxx
1


 
1
rocketmq.name-server=127.0.0.1:9876
2
rocketmq.producer.group=greetings-producer-group



The first property, name-server, tells the application where the Apache RocketMQ nameserver lives. The nameserver, then, knows where the broker lives. You’ll need to also specify a group for both the producer and the consumer. Here, we use greetings-producer-group.

Java
 




xxxxxxxxxx
1
60


 
1
package com.example.producer;
2
 
           
3
import lombok.AllArgsConstructor;
4
import lombok.Data;
5
import lombok.NoArgsConstructor;
6
import lombok.RequiredArgsConstructor;
7
import org.apache.rocketmq.spring.core.RocketMQTemplate;
8
import org.springframework.boot.SpringApplication;
9
import org.springframework.boot.autoconfigure.SpringBootApplication;
10
import org.springframework.boot.context.event.ApplicationReadyEvent;
11
import org.springframework.context.ApplicationListener;
12
import org.springframework.context.annotation.Bean;
13
import org.springframework.messaging.Message;
14
import org.springframework.messaging.core.MessagePostProcessor;
15
import org.springframework.messaging.support.MessageBuilder;
16
 
           
17
import java.time.Instant;
18
 
           
19
@RequiredArgsConstructor
20
@SpringBootApplication
21
public class ProducerApplication {
22
 
           
23
    @Bean
24
    ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
25
        return event -> {
26
 
           
27
            var now = Instant.now();
28
            var destination = "greetings-topic";
29
 
           
30
            for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {
31
 
           
32
                var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
33
                var messagePostProcessor = new MessagePostProcessor() {
34
 
           
35
                    @Override
36
                    public Message<?> postProcessMessage(Message<?> message) {
37
                        var headerValue = Character.toString(name.toLowerCase().charAt(0));
38
                        return MessageBuilder
39
                            .fromMessage(message)
40
                            .setHeader("letter", headerValue)
41
                            .build();
42
                    }
43
                };
44
                template.convertAndSend(destination, payload, messagePostProcessor);
45
            }
46
        };
47
    }
48
 
           
49
    public static void main(String[] args) {
50
        SpringApplication.run(ProducerApplication.class, args);
51
    }
52
}
53
 
           
54
@Data
55
@AllArgsConstructor
56
@NoArgsConstructor
57
class Greeting {
58
    private String message;
59
}



I don’t know if it can get much simpler than that! It’s a simple for-loop, processing each name, creating a new Greeting object, and then using the RocketMQTemplate to send the payload to an Apache RocketMQ topic, greetings-topic. Here, we’ve used the overload of the RocketMQTemplate object that accepts a MessagePostProcessor. The MessagePostProcessor is a callback in which we can transform the Spring Framework Message object that will be sent out. In this example, we contribute a header value, letter, that contains the first letter of the name. We’ll use this in the consumer.

Let’s look at the consumer. Generate a new Spring Boot application from the Spring Initializr and be sure to add the Apache RocketMQ autoconfiguration. You’ll need to specify the name server in application.properties for the client, too.

The autoconfiguration supports defining beans that implement RocketMQListener<T>, where T is the type of the payload that the consumer will receive. The payload, in this case, is the Greeting.

Java
 




xxxxxxxxxx
1
44


 
1
package com.example.consumer;
2
 
           
3
import lombok.AllArgsConstructor;
4
import lombok.Data;
5
import lombok.NoArgsConstructor;
6
import lombok.extern.log4j.Log4j2;
7
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
8
import org.apache.rocketmq.spring.core.RocketMQListener;
9
import org.springframework.boot.SpringApplication;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.stereotype.Service;
12
 
           
13
import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;
14
 
           
15
@SpringBootApplication
16
public class ConsumerApplication {
17
 
           
18
    public static void main(String[] args) {
19
        SpringApplication.run(ConsumerApplication.class, args);
20
    }
21
}
22
 
           
23
@Data
24
@AllArgsConstructor
25
@NoArgsConstructor
26
class Greeting {
27
    private String message;
28
}
29
 
           
30
@Log4j2
31
@Service
32
@RocketMQMessageListener(
33
    topic = "greetings-topic",
34
    consumerGroup = "simple-group"
35
)
36
class SimpleConsumer implements RocketMQListener<Greeting> {
37
 
           
38
    @Override
39
    public void onMessage(Greeting greeting) {
40
        log.info(greeting.toString());
41
    }
42
}



In this example, the SimpleConsumer simply logs all incoming messages from the greetings-topic topic in Apache RocketMQ. Here, the consumer will process every message on the topic. Let’s look at another nice feature - selectors - that let us selectively process incoming messages. Let’s replace the existing RocketMQ listener with two new ones. Each one will use a SQL92-compatible predicate to determine whether incoming messages should be processed. One listener processes only the messages that have a letter header matching m, k, or t. The other matches only those whose letter header matches j.

Java
 




x
34


 
1
@Log4j2
2
@Service
3
@RocketMQMessageListener(
4
    topic = "greetings-topic",
5
    selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
6
    selectorType = SQL92,
7
    consumerGroup = "sql-consumer-group-mkt"
8
)
9
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {
10
 
           
11
    @Override
12
    public void onMessage(Greeting greeting) {
13
        log.info("'m', 'k', 't': " + greeting.toString());
14
    }
15
}
16
 
           
17
 
           
18
@Log4j2
19
@Service
20
@RocketMQMessageListener(
21
    topic = "greetings-topic",
22
    selectorExpression = " letter = 'j' ",
23
    selectorType = SQL92,
24
    consumerGroup = "sql-consumer-group-j"
25
)
26
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {
27
 
           
28
    @Override
29
    public void onMessage(Greeting greeting) {
30
        log.info("'j': " + greeting.toString());
31
    }
32
}



Not bad, eh? There’s plenty of other things that Apache RocketMQ supports (besides processing trillions of messages in 24 hours!) It can store long tail messages on disk, without degrading performance. It supports serialization - the ordering of - of messages, transactions, batch processing, etc. It even supports scheduled messages - messages that are only delivered after a certain interval. Needless to say, I’m a big Apache RocketMQ fan.

Spring Framework Spring Boot Java (programming language)

Published at DZone with permission of Josh Long. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Be Punctual! Avoiding Kotlin’s lateinit In Spring Boot Testing
  • Spring Boot: How To Use Java Persistence Query Language (JPQL)
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • Java, Spring Boot, and MongoDB: Performance Analysis and Improvements

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook