DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Evaluating Message Brokers
  • ActiveMQ JMS (Java Messaging Service) vs. Data Streaming Kafka With Camel Code Sample
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions
  • How to Integrate Event-Driven Ansible With Kafka

Trending

  • Java's Quiet Revolution: Thriving in the Serverless Kubernetes Era
  • AI, ML, and Data Science: Shaping the Future of Automation
  • Recurrent Workflows With Cloud Native Dapr Jobs
  • Docker Model Runner: Streamlining AI Deployment for Developers
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Message Filtering: An Analysis

Kafka Message Filtering: An Analysis

An analysis of how to implement a Kafka message filtering strategy, first as a general approach, then with the consumer needing to recover after deserialization errors.

By 
Horatiu Dan user avatar
Horatiu Dan
·
Mar. 19, 24 · Analysis
Likes (1)
Comment
Save
Tweet
Share
3.9K Views

Join the DZone community and get the full member experience.

Join For Free

A lot of companies nowadays use event-driven architectures in their day-to-day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.

In such a scenario, during the interactions among the three main types of actors — producers, message brokers, and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.

This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at the consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve the correct behavior of the consumer.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.2.2
  • Redpanda message broker running in Docker – image version 23.2.15

As a message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal setup.

Once the Docker container is up and running, a topic called request is created with the following command:

Shell
 
>docker exec -it redpanda-0 rpk topic create request
TOPIC    STATUS
request  OK
Shell
 
>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
request             1           1


As shown, the request topic was created successfully. 

Implement a Record Filter Strategy

The use case is the following:

  • The producer sends a request to the configured topic
  • If the request message fulfills the acceptance criteria, the consumer processes it
  • Otherwise, the message is discarded

 A request message has a simple form: 

JSON
 
{
	"id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d",
	"contextId": "hcd"
}


Having just two fields, an identifier and a context identifier. 

Messages are taken into account only in a certain acceptable context. Differently, put, a message is accepted if it has a certain contextId, that is equal to the one configured on the consumer side, otherwise, it is discarded. 

A request is modeled by the following record: 

Java
 
public record Request(String id, String contextId) {
}


For configuring a producer and a consumer, at least these properties are needed (application.properties file): 

Properties files
 
# the path to the message broker
broker.url = localhost:19092

# the name of the topic
topic.request = request

# the unique string that identifies the consumer group of the consumer
context.id = hcd


The requirement is clear – only the messages having hcd as contextId are accepted. 

In order to send messages, a producer needs a KafkaTemplate instance, configured as below: 

Java
 
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}


One may observe in the producer configuration that a StringSerializer was chosen for marshaling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility.

Once the messages reach the request topic, a consumer is configured to pick them up.

Java
 
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${context.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());

        DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(), new JsonDeserializer<>(Request.class));

        ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setRecordFilterStrategy(recordFilterStrategy);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}


Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a means to decide whether a message is filtered out or not.

The RecordFilterStrategy interface has one abstract method:

Java
 
boolean filter(ConsumerRecord<K, V> consumerRecord);


Which according to its JavaDoc, returns true if the ConsumerRecord should be discarded (K represents the message key, while V the message value).

In the case of this proof of concept, all messages that have their contextId equal to hcd are accepted and consumed, while the rest are filtered out. The implementation is below.

Java
 
@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class);

    @Value("${context.id}")
    private String contextId;

    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        Request request = consumerRecord.value();

        boolean discard = !contextId.equals(request.contextId());
        LOG.info("{} is{} compliant.", request, discard ? "n't" : "");
        return discard;
    }
}


As part of the configuration, the KafkaListenerContainerFactory interface is responsible for creating the listener container of a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next. 

Java
 
@Component
public class RequestMessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class);

    private final ResponseService responseService;

    public RequestMessageListener(ResponseService responseService) {
        this.responseService = responseService;
    }

    @KafkaListener(topics = "${topic.request}", groupId = "${context.id}")
    public void onMessage(@Payload Request request) {
        LOG.info("Processing {}.", request);

        responseService.send(Response.success());
    }
}


Its functionality is trivial, it logs the messages read from the request topic and destined to the configured consumer group. Then, it invokes a ResponseService which acts as the entity that sends a message back (here, it only logs it). 

Java
 
@Service
public class ResponseService {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class);

    public void send(Response response) {
        LOG.info("Sending {}.", response);
    }
}


A Reponse is modeled simply, as below: 

Java
 
public record Response (String id,
                        Result result) {
 
    public static Response success() {
        return new Response(UUID.randomUUID().toString(), Result.SUCCESS);
    }
     
    public enum Result {
        SUCCESS, FAILURE
    }
}


When the application is started, provided the message broker is up, the listener is ready to receive messages. 

Plain Text
 
INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication               : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868)
INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer            : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request
INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc
INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}


In order to check the integration, the following two tests are used. Since a Request is expected by the listener, a compliance template was created for convenience. 

Java
 
@SpringBootTest
class RecordFilterStrategyApplicationTests {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Value("${topic.request}")
    private String topic;
 
    @Value("${context.id}")
    private String contextId;
 
    private static final String template = """
        {
            "id": "%s",
            "contextId": "%s"
        }""";
 
    @Test
    void compliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), contextId));
    }
 
    @Test
    void notCompliant() {
        kafkaTemplate.send(topic,
                String.format(template, UUID.randomUUID(), "other context"));
    }
}


compliant() sends a message whose contextId is as this consumer has configured it. As expected, it is processed and a response is sent back. 

Plain Text
 
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy    : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd].
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService        : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].


notCompliant() sends a message whose contextId is different from what was configured on this consumer. Thus, the message is neither processed, nor responded to, but ignored. 

Plain Text
 
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.


So far, the proof of concept has exemplified how to configure the consumer with a record-filtering strategy so that only certain messages are accepted. 

The code for this part is here – 1-filter-strategy 

Implement a Record Filter Strategy With Custom Deserialization

Let’s assume that the messages that are consumed from the request queue are unmarshalled using a custom deserializer and the filtering is still required.

The custom deserializer here is trivial and has a didactic purpose. Moreover, in case the id field is missing, a runtime RequestDeserializationException is thrown. Such an action is not necessarily needed at this point, but it was put here to outline a certain use case. Read on.

Java
 
public class CustomRequestDeserializer extends StdDeserializer<Request> {
 
    private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class);
 
    public CustomRequestDeserializer() {
        super(Request.class);
    }
 
    @Override
    public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        ObjectCodec oc = jsonParser.getCodec();
        JsonNode root = oc.readTree(jsonParser);
 
        final String contextId = deserializeField(root, "contextId");
 
        final String id = deserializeField(root, "id");
        if (id == null || id.isEmpty()) {
            throw new RequestDeserializationException("'id' is required");
        }
 
        Request request = new Request(id, contextId);
        LOG.info("Successfully deserialized {}", request);
        return request;
    }
}


To apply it, the Request record is annotated as below: 

Java
 
@JsonDeserialize(using = CustomRequestDeserializer.class)
public record Request(String id, String contextId) {
}


Up until now, the behavior described in the first part is preserved. If the previous compliant() and nonCompliant() tests are run again, the outcome is the same.

The next analyzed situation is the one in which a RequestDeserializationException is thrown when deserializing an incoming message. Let’s assume the id is empty, thus the form is as below:

JSON
 
{
    "id": "",
    "contextId": "hcd"
}
Java
 
@Test
void deserializationError_compliant() {
    kafkaTemplate.send(topic,
            String.format(template, "", contextId));
}


When such a message is received, the outcome is the following: 

Plain Text
 
...
Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required
...


An exception thrown at deserialization time determines the message to be neither consumed, nor responded to, but to be lost.

See [Resource 3] for a detailed analysis of situations like this.

One solution that allows recovering after deserialization exceptions is to configure the value deserializer of the KafkaListenerContainerFactory with a failed deserialization function – see line 15 below:

Java
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy,
                                                                                              FailedRequestDeserializationFunction failedDeserializationFunction) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName());
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName());
 
    JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class);
 
    ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
    valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction);
 
    DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(), valueDeserializer);
 
    ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(defaultFactory);
    factory.setRecordFilterStrategy(recordFilterStrategy);
    factory.setCommonErrorHandler(new DefaultErrorHandler());
    return factory;
}


The purpose of the component is to allow recovery after such an exceptional situation and to be able to send a failure response back.

Java
 
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);
 
    private final ResponseService responseService;
 
    public FailedRequestDeserializationFunction(ResponseService responseService) {
        this.responseService = responseService;
    }
 
    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();
 
        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());
 
            responseService.send(Response.failure());
 
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }
 
        return null;
    }
}


 If the same test is run again and a compliant, but incorrect message is sent, the behavior changes. 

Plain Text
 
2024-03-13T10:52:38.893+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T10:52:38.895+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].


The only left case is that of a non-compliant and incorrect message, meaning the id is still empty, but the contextId is different from the expected one. 

JSON
 
{
    "id": "",
    "contextId": "other context"
}


If the following test is run, nothing changes, unfortunately the failed deserialization function still sends a failure response back, although the record filtering strategy should have filtered the message out as the contextId is non-compliant. 

Java
 
@Test
void deserializationError_notCompliant() {
    kafkaTemplate.send(topic,
            String.format(template, "", "other context"));
}
Plain Text
 
2024-03-13T11:03:56.609+02:00  INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T11:03:56.610+02:00  INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService           : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].


The code for this second section is here: 2-filter-strategy-custom-deser. 

Implement a Record Filter Strategy With Custom Deserialization – Correctly

The last part of this analysis provides a solution on how to address this last use case.

Before moving on with it, let’s recall what currently happens in all possible use cases:

Correct, Compliant Message

  1. Since the message is correct, the custom deserializer successfully unmarshalled it
  2. The failed deserialization function is not invoked
  3. Since the message is compliant, the record filter strategy does not reject it
  4. The listener is invoked, it processes the request and sends a response back

Correct, Non-Compliant Message

  1. Since the message is correct, the custom deserializer successfully unmarshalled it
  2. The failed deserialization function is not invoked
  3. Since the message is non-compliant, the record filter strategy rejects it
  4. The listener is not invoked

Incorrect, Compliant or Non-Compliant Message

  1. Since the message is incorrect, the custom deserializer throws an exception
  2. The failed deserialization is invoked and it sends a failure response back
  3. The record filter strategy is not invoked
  4. The listener is not invoked

In case of a correct message, the consumer application behaves correctly, irrespective of the compliance of the message.

In case of an incorrect message, a failed response is sent back, irrespective of the compliance of the message, which means the consumer behaves correctly only for compliant messages.

For incorrect, non-compliant messages it should act as follows:

  1. Since the message is incorrect, the custom deserializer throws an exception
  2. The failed deserialization is invoked and it sends a failure response back only if the message is compliant
  3. The record filter strategy is not invoked
  4. The listener is not invoked

At first glance, in order to cover the last use-case as well, only the FailedRequestDeserializationFunction needs to be enhanced to also check the message compliance.

Basically, before sending the response, the same check as the one in CustomRecordFilterStrategy shall be added. To avoid repetition, some refactoring is done.

To isolate the compliance check, a separate component in charge of it is created.

Java
 
@Component
public class RequestFilterStrategy {
 
    private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class);
 
    @Value("${context.id}")
    private String contextId;
 
    public boolean filter(String contextId) {
        boolean discard = !this.contextId.equals(contextId);
        LOG.info("Request is{} compliant.", discard ? "n't" : "");
        return discard;
    }
}


Then, the component is injected in the CustomRecordFilterStrategy and in the FailedRequestDeserializationFunction and consequently, they are refactored as follows.

Java
 
@Component
public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> {
 
    private final RequestFilterStrategy requestFilterStrategy;
 
    public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) {
        this.requestFilterStrategy = requestFilterStrategy;
    }
 
    @Override
    public boolean filter(ConsumerRecord<String, Request> consumerRecord) {
        return requestFilterStrategy.filter(consumerRecord.value().contextId());
    }
}
Java
 
@Component
public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> {
 
    private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class);
 
    private final RequestFilterStrategy requestFilterStrategy;
    private final ResponseService responseService;
 
    public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy,
                                                ResponseService responseService) {
        this.requestFilterStrategy = requestFilterStrategy;
        this.responseService = responseService;
    }
 
    @Override
    public Request apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception ex = failedDeserializationInfo.getException();
 
        if (ex instanceof RequestDeserializationException deserializationEx) {
            LOG.info("Error deserializing request - {}", deserializationEx.getMessage());
 
            if (!requestFilterStrategy.filter(deserializationEx.getContextId())) {
                responseService.send(Response.failure());
            }
        } else {
            LOG.error("Unexpected error deserializing request.", ex);
        }
 
        return null;
    }
}


To check the behavior, the last unit test is run again. 

Java
 
@Test
void deserializationError_notCompliant() {
    kafkaTemplate.send(topic,
            String.format(template, "", "other context"));
}


The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore. 

Plain Text
 
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required
2024-03-13T15:05:56.432+02:00  INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy     : Request isn't compliant.


The code for the enhanced solution is here: 3-filter-strategy-custom-deser-covered 

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. Acting Soon on Kafka Deserialization Errors
  4. The picture was taken at Legoland, Germany
Message broker kafka

Published at DZone with permission of Horatiu Dan. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Evaluating Message Brokers
  • ActiveMQ JMS (Java Messaging Service) vs. Data Streaming Kafka With Camel Code Sample
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions
  • How to Integrate Event-Driven Ansible With Kafka

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!