DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]
  • Event Mesh: Point-to-Point EDA
  • Kafka Fail-Over Using Quarkus Reactive Messaging
  • Next-Gen Data Pipes With Spark, Kafka and k8s

Trending

  • Intro to RAG: Foundations of Retrieval Augmented Generation, Part 2
  • Go 1.24+ Native FIPS Support for Easier Compliance
  • Developers Beware: Slopsquatting and Vibe Coding Can Increase Risk of AI-Powered Attacks
  • Exploring Intercooler.js: Simplify AJAX With HTML Attributes
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Setting up Kafka Cluster With Gluster-Block Storage

Setting up Kafka Cluster With Gluster-Block Storage

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka.

By 
Ramu kakarla user avatar
Ramu kakarla
·
Jul. 22, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
6.9K Views

Join the DZone community and get the full member experience.

Join For Free

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

Kafka Bridge

AMQ Streams Kafka Bridge provides a Restful interface that allows HTTP-based clients to interact with a Kafka cluster.  Kafka Bridge offers the advantages of a web API connection to AMQ Streams, without the need for client applications to interpret the Kafka protocol.

The API has two main resources — consumers and topics — that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.

The bridge provides a REST API, described by an Open API specification, which exposes multiple endpoints to allow typical Apache Kafka operations:

  • Sending messages to topics (including to a specific partition)
  • Subscribing to one or more topics (even using a pattern) as part of a consumer group, or asking for a specific partition assignment
  • Receiving messages from the subscribed topics
  • Committing offsets related to the received messages
  • Seeking to a specific position (or at the beginning/end) in a topic partition

The client behavior and the interaction with the Apache Kafka cluster, through the bridge, is the same which happens with a native Kafka client but with HTTP/1.1 protocol semantics.

Each endpoint allows specific HTTP methods (GET, POST, DELETE) to execute the above operations.

Prerequisites

For this demonstration, you will need the following technologies set up in your development environment:

  1. An Open Shift 3.11+ environment with Cluster Admin access
  2. Gluster block installed on openshift
  3. Open shift CLI (oc)
  4. Apache Maven 3.6.3+

In this article, we demonstrate how to set up a persistent Kafka cluster with 3 Kafka brokers and 3 Kafka zookeepers with gluster-block storage and exposure through Kafka bridge.

Set Up Kafka Cluster

Download the example/install files from Red Hat Customer Portal or STRIMZI

AMQ Streams requires block storage and is designed to work optimally with cloud-based block storage solutions, including Amazon Elastic Block Store (EBS). The use of file storage is not recommended.

Choose local storage (local persistent volumes) when possible. If local storage is not available, you can use a Storage Area Network (SAN) accessed by a protocol such as Fibre Channel or iSCSI.

There is no need to provision replicated storage because Kafka and Zookeeper both have built-in data replication.

It is recommended that you configure your storage system to use the XFS file system. AMQ Streams is also compatible with the ext4 file system, but this might require additional configuration for the best results.

For this demonstration gluster-block installed on openshift.

Verify the  storage class up and running, in this demo, we will use PVCs taken from Gluster-block to persist the data that is written to the Kafka commit log by each node

Shell
 




x


 
1
oc get sc
2
NAME                                PROVISIONER                          AGE
3
glusterfs-storage                   kubernetes.io/glusterfs              4d
4
glusterfs-storage-block (default)   gluster.org/glusterblock-glusterfs   4d
5

          



So we have our glusterblock storage class set as the default, which means we can start consuming PVs. To deploy our Kafka cluster, we will use a Kafka Cluster CRD:

Java
 




xxxxxxxxxx
1
32


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: Kafka
3
metadata:
4
  name: my-cluster
5
spec:
6
  kafka:
7
    version: 2.3.0
8
    replicas: 3
9
    listeners:
10
      plain: {}
11
      tls: {}
12
    config:
13
      offsets.topic.replication.factor: 3
14
      transaction.state.log.replication.factor: 3
15
      transaction.state.log.min.isr: 2
16
      log.message.format.version: "2.3"
17
    storage:
18
      type: jbod
19
      volumes:
20
      - id: 0
21
        type: persistent-claim
22
        size: 3Gi
23
        deleteClaim: false
24
  zookeeper:
25
    replicas: 3
26
    storage:
27
      type: persistent-claim
28
      size: 1Gi
29
      deleteClaim: false
30
  entityOperator:
31
    topicOperator: {}
32
    userOperator: {}



The above CRD  will create 3 replicas for both Zookeeper and Kafka, data will also be replicated 3 times across the Kafka cluster nodes, where each created PV is backing up the Kafka commit log. Those PVs will be created automatically in the gluster block storage class as this is the default one.

Java
 




xxxxxxxxxx
1
11


 
1
oc get pods
2
NAME                                          READY   STATUS    RESTARTS   AGE
3
my-cluster-entity-operator-7974b77699-95gs4   3/3     Running   0          4m
4
my-cluster-kafka-0                            2/2     Running   0          6m
5
my-cluster-kafka-1                            2/2     Running   0          6m
6
my-cluster-kafka-2                            2/2     Running   1          6m
7
my-cluster-zookeeper-0                        2/2     Running   0          6m
8
my-cluster-zookeeper-1                        2/2     Running   0          6m
9
my-cluster-zookeeper-2                        2/2     Running   0          6m
10
strimzi-cluster-operator-5bf8d947f9-kc6jv     1/1     Running   0          7m
11

          



 Now verify the PVs.

Java
 




xxxxxxxxxx
1


 
1
oc get pv -n glusterfs
2
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                               STORAGECLASS              REASON   AGE
3
pvc-4b338dfb-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-1   glusterfs-storage-block            12m
4
pvc-4b353de6-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-2   glusterfs-storage-block            12m
5
pvc-4b3b2993-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-0   glusterfs-storage-block            12m
6
pvc-6c222269-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-0     glusterfs-storage-block            12m
7
pvc-6c22c2fd-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-2     glusterfs-storage-block            11m
8
pvc-6c23246e-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-1     glusterfs-storage-block            12m
9

          



Now create a Kafka Topic 

Java
 




xxxxxxxxxx
1


 
1
oc exec -it my-cluster-kafka-0 -c kafka -- bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
2
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
3
Created topic test.
4

          
5
oc get kt
6
NAME   PARTITIONS   REPLICATION FACTOR
7
test   3            3
8
  



Set up Kafka Bridge

Shell
 




xxxxxxxxxx
1
15


 
1
apiVersion: kafka.strimzi.io/v1alpha1
2
kind: KafkaBridge
3
metadata:
4
  name: my-bridge
5
spec:
6
  replicas: 1
7
  bootstrapServers: my-cluster-kafka-bootstrap:9092
8
  http:
9
    port: 8080
10
  consumer:
11
    config:
12
      auto.offset.reset: earliest
13
  producer:
14
    config:
15
      delivery.timeout.ms: 300000



The above CRD will create Kafka bridge 

Shell
 




xxxxxxxxxx
1


 
1
oc get all | grep bridge
2
pod/my-bridge-bridge-d847cb4b7-qkchv              1/1     Running   0          1m
3
service/my-bridge-bridge-service      ClusterIP   172.30.107.103   <none>        8080/TCP                     1m
4
deployment.apps/my-bridge-bridge             1         1         1            1           1m
5
replicaset.apps/my-bridge-bridge-d847cb4b7              1         1         1       1m
6

          



Clients for the Kafka Bridge

Internal Clients
Internal clients are container-based HTTP clients running in the same Open Shift cluster as the Kafka Bridge itself. Internal clients can access the Kafka Bridge on the host and port defined in the Kafka Bridge custom resource.

External Clients

External clients are HTTP clients running outside the Open Shift cluster in which the Kafka Bridge is deployed and running. External clients can access the Kafka Bridge through an Open Shift Route, a loadbalancer service, or using an Ingress.

Now create a route for 'service/my-bridge-bridge-service '  so that external clients can access

Java
 




x


 
1
oc expose service/my-bridge-bridge-service 
2
route.route.openshift.io/my-bridge-bridge-service exposed
3

          
4
 oc get routes
5
NAME                       HOST/PORT                                                                     PATH   SERVICES                   PORT       TERMINATION   WILDCARD
6
my-bridge-bridge-service   my-bridge-bridge-service-kafka.apps.xxx.redhat.com          my-bridge-bridge-service   rest-api                 None
7

          



Producing Endpoints

The bridge exposes two main REST endpoints to send messages:

  • /topics/{topicname}
  • /topics/{topicname}/partitions/{partitionid}

The first one is used to send a message to a topic topicname while the second one allows the user to specify the partition via partitionid. Even using the first endpoint the user can specify the destination partition in the body of the message.

To send a message, a producer has to connect to the bridge using an HTTP POST request to the specific endpoint with a JSON payload containing the value and optionally the key and partition.

  • the first one has key and value, so the bridge will send it to the partition based on the hash of the key
  • the second one has the specified destination partition and the value
  • the third one just has the value, so the bridge will apply a round-robin mechanism to determine the partition
Java
 




x


 
1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/topics/test -H 'content-type:application/vnd.kafka.json.v2+json'    -d '{
2
>     "records": [
3
>         {
4
>             "key": "key-1",
5
>             "value": "value-1"
6
>         },
7
>         {
8
>             "key": "key-2",
9
>             "value": "value-2"
10
>         }
11
>     ]
12
> }'
13
  
14
{"offsets":[{"partition":1,"offset":0},{"partition":2,"offset":0}]}



Consuming Endpoints

From a consumer perspective, the bridge is much more complex due to the nature of how consuming messages from Apache Kafka works about consumer groups and partition rebalancing. For this reason, before subscribing to topics and starting to receive messages, an HTTP client has to “create” a corresponding consumer on the bridge which also means joining a consumer group. This happens through an HTTP POST on the following endpoint and providing a consumer configuration in the JSON payload.

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/consumers/my-group \
2
>   -H 'content-type: application/vnd.kafka.v2+json' \
3
>   -d '{
4
>     "name": "my-consumer",
5
>     "format": "json",
6
>     "auto.offset.reset": "earliest",
7
>     "enable.auto.commit": false
8
>   }'
9
{"instance_id":"my-consumer","base_uri":"http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer"}



The bridge created a new consumer in the group and returns to the client so-called base_uri which is the URL that the client has to use for sending the subsequent requests (i.e. subscribe, polling, …).

The HTTP consumer will interact with the following endpoints for subscribing to topics, getting messages, committing offsets, and finally deleting the consumer.

  • /consumers/{groupid}/instances/{name}/subscription
  • /consumers/{groupid}/instances/{name}/records
  • /consumers/{groupid}/instances/{name}/offsets
  • /consumers/{groupid}/instances/{name}

Subscribe

Java
 




xxxxxxxxxx
1


 
1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/subscription \
2
  -H 'content-type: application/vnd.kafka.v2+json' \
3
  -d '{
4
    "topics": [
5
        "test"
6
    ]
7
}'



Consume

As a native Apache Kafka client, getting messages means doing a “poll” operation which in terms of HTTP protocol means doing HTTP GET requests on the relevant endpoints; the bridge will return an array of records with the topic, key, value, partition and offset.

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X GET http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/myonsumer/records \
2
>   -H 'accept: application/vnd.kafka.json.v2+json'
3
[{"topic":"test","key":"key-2","value":"value-2","partition":2,"offset":0}]



After consuming messages, if the auto-commit feature is not enabled on consumer creation, it is necessary to commit the offsets via an HTTP POST request specifying an offsets collection with topic, partition, and required offset to commit.

Java
 




xxxxxxxxxx
1


 
1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/offsets
2



The bridge also exposes endpoints for seeking into a topic partition at the beginning, at the end, or a specific offset.

  • /consumers/{groupid}/instances/{name}/positions
  • /consumers/{groupid}/instances/{name}/positions/beginning
  • /consumers/{groupid}/instances/{name}/positions/end

To seek to a specific position in the partition, the consumer must provide offset information through the JSON payload in the HTTP POST request. The format is the same as used to commit the offset.

Conclusion

Exposing the Apache Kafka cluster to clients using HTTP enables scenarios where the use of native clients is not desirable. Such situations include resource-constrained devices, network availability, and security considerations. Interaction with the bridge is similar to the native Apache Kafka clients but using the semantics of an HTTP REST API. The inclusion of the HTTP Bridge in Strimzi enhances the options available to developers when building applications with Apache Kafka.

kafka cluster

Opinions expressed by DZone contributors are their own.

Related

  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]
  • Event Mesh: Point-to-Point EDA
  • Kafka Fail-Over Using Quarkus Reactive Messaging
  • Next-Gen Data Pipes With Spark, Kafka and k8s

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!