Partitioning With Apache Kafka and Vert.x
Learn about the main features of Apache Kafka that may be useful when creating apps that consume messages, and learn about using it with Vert.x.
Join the DZone community and get the full member experience.
Join For FreeApache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provide two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allow you broadcast data to multiple target consumers. Kafka does not provide a queuing mechanism. However, it does introduce the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanisms guarantee that a single message would be processed by the only one consumer that belongs to the given group. It's especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers behave as queuing clients within the same group.
Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution in the some of my previous posts; for example, Asynchronous Microservices With Vert.x. Vert.x doesn't force you to implement a reactive application. You may create a standard service that processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.
The Purpose of This Article
The main purpose of this article is to show you the main features of Apache Kafka that may be useful when creating applications consuming messages. The Java client's library choice is not a key point here. However, in my opinion, Vert.x as an asynchronous, high-performance framework perfectly matches Apache Kafka. It provides a Vert.x Kafka client, which allows you to read and send messages from/to a Kafka cluster. Before we proceed to the sample, let's first dive into the core abstraction of Kafka.
Kafka Topics
I'm assuming you know what the topic is and what its main role is. Every message coming into the topic goes to every subscriber. What is the main difference between Kafka and standard topics provided by other message brokers? Kafka topics are partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquely identified within the partition by a sequential ID number called the offset. The Kafka cluster retains all published records according to the configured retention period.
The consumer may subscribe to the whole topic or only to a selected partition. It can also control the offset from where it starts processing data. For example, it's able to reset offset in order to reprocess data from the past or just skip ahead to the most recent record to consume only messages currently sent to the topic. The figure below illustrates a single partition structure with producers and consumers listening for incoming data.
Sample Architecture
Let me explain the sample system architecture. Its source code is available on GitHub. In accordance with the principle that a picture speaks a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform that consists of two partitions. There is one client application that exposes REST APIs, affording the ability to send orders into the system and then forward them into the topic. The target partition is calculated based on the type of order. We can create orders with type single or multiple. There are also some applications that consume data from the topic. The first single-order-processor reads data from partition 0, the second multiple-order-processor reads data from partition 1, and the last all-order-processor does not choose any partition.
Running Kafka
To run Apache Kafka on the local machine, we may use its Docker image. The image shared by Spotify also starts ZooKeeper server, which is used by Kafka. If you run Docker on Windows, the default address of its virtual machine is 192.168.99.100.
docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka
However, that option assumes the topics will be automatically created during application startup. I've got some problems with it while creating the multi-partitions topic. There is also another image, ches/kafka
, which requires starting ZooKeeper separately but provides a Kafka client interface.
docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka
Finally, we can run ches/kafka
container in client mode and then create topic orders-out
with two partitions.
docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".
Building Producer Applications
First, we need to include Maven dependencies to enable the Vert.x framework for the application. If the application exposes RESTful HTTP APIs, you should include vertx-web
. The library vertx-kafka-client
has to be included for all the sample modules.
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>3.5.0</version>
</dependency>
To start Vert.x as a Java application, we have to create a verticle by extending AbstractVerticle
. Then, the verticle needs to be deployed in the main method using a Vertx
object. For more details about Vert.x and the concept of verticles, you may refer to my previous article, Asynchronous Microservices With Vert.x.
public class OrderVerticle extends AbstractVerticle {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new OrderVerticle());
}
}
The next step is to define the producer using the KafkaProducer
interface. We have to provide the connection settings and the serializer implementation class. You can choose between various built-in serializer implementations. The most suitable for me was JsonObjectSerializer
, which requires JsonObject
as an input parameter.
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer<String, JsonObject> producer = KafkaProducer.create(vertx, config);
The producer is invoked inside the POST method route definition. It returns an asynchronous response with a status after sending the message to the topic. The message is created using the KafkaProducerRecord
interface. It takes topic's name, request object, and partition number as the parameters. As you can see in the fragment of code below, the partition number is calculated based on order type (o.getType().ordinal()
).
Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
KafkaProducerRecord < String,
JsonObject > record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
producer.write(record, done -> {
if (done.succeeded()) {
RecordMetadata recordMetadata = done.result();
LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
o.setId(recordMetadata.getOffset());
o.setStatus(OrderStatus.PROCESSING);
} else {
Throwable t = done.cause();
LOGGER.error("Error sent to topic: {}", t.getMessage());
o.setStatus(OrderStatus.REJECTED);
}
rc.response().end(Json.encodePrettily(o));
});
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);
Building Consumer Applications
The consumer configuration is very similar to that for the producer. We also have to set the connection settings and class used for deserialization. There is one interesting setting that has been defined for the consumer in the fragment of code visible below: auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of the stream, use value earliest
. If you would like to processes only the newest records (received after application startup), set that property to latest
. Because in our case Kafka acts as a message broker, it is set to latest.
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
As you probably remember, we have three different applications that subscribe to the topic. The first of them, implemented under the module all-order-processor, consumes all the events coming into the topic. This implementation is the simplest. We only need to invoke the subscribe method and pass the name of the topic as a parameter. Then. every incoming message is processed by the handler
method.
consumer.subscribe("orders-out", ar -> {
if (ar.succeeded()) {
LOGGER.info("Subscribed");
} else {
LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
}
});
consumer.handler(record -> {
LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
Order order = Json.decodeValue(record.value(), Order.class);
order.setStatus(OrderStatus.DONE);
LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});
The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.
TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
if (ar.succeeded()) {
LOGGER.info("Subscribed");
consumer.assignment(done1 -> {
if (done1.succeeded()) {
for (TopicPartition topicPartition: done1.result()) {
LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
}
} else {
LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
}
});
} else {
LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
}
});
The implementation of the handle method inside multiple-order-processor is pretty interesting. If it receives an order with the non-empty field relatedOrderId
, it tries to find it in the historical records stored in the topic. This can be achieved by calling the seek
method on KafkaConsumer
.
consumer.handler(record -> {
LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
Order order = Json.decodeValue(record.value(), Order.class);
if (ordersWaiting.containsKey(record.offset())) {
LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
consumer.seekToEnd(tp);
}
if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
ordersWaiting.put(order.getRelatedOrderId(), order);
consumer.seek(tp, order.getRelatedOrderId());
}
});
Testing
Now, it is time to launch our applications. You can run the main classes from your IDE or build the whole project using the mvn clean install
command and then running it with java -jar
. Also, run two instances of the all-order-processor in order to check out how consumer groups mechanisms work in practice.
Let's send some test requests to the order-service in the following sequence.
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}
Here's the log from the producer application.
2018-01-30 11:08:48 [INFO ] Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ] Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ] Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ] Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1
Here's the log from the single-order-processor. It processes only messages from partition 0.
2018-01-30 11:08:48 [INFO ] Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ] Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1
Here's the log from the multiple-order-processor. It processes only messages from partition 1.
2018-01-30 11:09:08 [INFO ] Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ] Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1
Here's the log from the first instance of the all-order-processor.
2018-01-30 11:08:48 [INFO ] Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ] Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1
Here's the log from the second instance of the all-order-processor. If you run two instances of the consumer that listens for the whole topic, each instance will process messages from the single partition.
2018-01-30 11:09:08 [INFO ] Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ] Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1
Summary
In this article, I provided some details on messaging with Apache Kafka. Concepts like consumer groups and partitioning make it different from traditional messaging solutions. Kafka is a widely adopted product that acts as storage, a messaging system, or a stream processor. With the popular JVM based toolkit Vert.x, it can be a powerful, fast, lightweight solution for your applications that exchange messages with each other. The key concepts introduced by Kafka have been adopted by Spring Cloud Stream, which makes them a good choice for creating messaging microservices.
Published at DZone with permission of Piotr Mińkowski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments