{{announcement.body}}
{{announcement.title}}

Setting up Kafka Cluster With Gluster-Block Storage

DZone 's Guide to

Setting up Kafka Cluster With Gluster-Block Storage

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka.

· Integration Zone ·
Free Resource

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

Kafka Bridge

AMQ Streams Kafka Bridge provides a Restful interface that allows HTTP-based clients to interact with a Kafka cluster.  Kafka Bridge offers the advantages of a web API connection to AMQ Streams, without the need for client applications to interpret the Kafka protocol.

The API has two main resources — consumers and topics — that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.

The bridge provides a REST API, described by an Open API specification, which exposes multiple endpoints to allow typical Apache Kafka operations:

  • Sending messages to topics (including to a specific partition)
  • Subscribing to one or more topics (even using a pattern) as part of a consumer group, or asking for a specific partition assignment
  • Receiving messages from the subscribed topics
  • Committing offsets related to the received messages
  • Seeking to a specific position (or at the beginning/end) in a topic partition

The client behavior and the interaction with the Apache Kafka cluster, through the bridge, is the same which happens with a native Kafka client but with HTTP/1.1 protocol semantics.

Each endpoint allows specific HTTP methods (GET, POST, DELETE) to execute the above operations.

Prerequisites

For this demonstration, you will need the following technologies set up in your development environment:

  1. An Open Shift 3.11+ environment with Cluster Admin access
  2. Gluster block installed on openshift
  3. Open shift CLI (oc)
  4. Apache Maven 3.6.3+

In this article, we demonstrate how to set up a persistent Kafka cluster with 3 Kafka brokers and 3 Kafka zookeepers with gluster-block storage and exposure through Kafka bridge.

Set Up Kafka Cluster

Download the example/install files from Red Hat Customer Portal or STRIMZI

AMQ Streams requires block storage and is designed to work optimally with cloud-based block storage solutions, including Amazon Elastic Block Store (EBS). The use of file storage is not recommended.

Choose local storage (local persistent volumes) when possible. If local storage is not available, you can use a Storage Area Network (SAN) accessed by a protocol such as Fibre Channel or iSCSI.

There is no need to provision replicated storage because Kafka and Zookeeper both have built-in data replication.

It is recommended that you configure your storage system to use the XFS file system. AMQ Streams is also compatible with the ext4 file system, but this might require additional configuration for the best results.

For this demonstration gluster-block installed on openshift.

Verify the  storage class up and running, in this demo, we will use PVCs taken from Gluster-block to persist the data that is written to the Kafka commit log by each node

Shell
 




x


 
1
oc get sc
2
NAME                                PROVISIONER                          AGE
3
glusterfs-storage                   kubernetes.io/glusterfs              4d
4
glusterfs-storage-block (default)   gluster.org/glusterblock-glusterfs   4d
5
 
          



So we have our glusterblock storage class set as the default, which means we can start consuming PVs. To deploy our Kafka cluster, we will use a Kafka Cluster CRD:

Java
 




xxxxxxxxxx
1
32


1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: Kafka
3
metadata:
4
  name: my-cluster
5
spec:
6
  kafka:
7
    version: 2.3.0
8
    replicas: 3
9
    listeners:
10
      plain: {}
11
      tls: {}
12
    config:
13
      offsets.topic.replication.factor: 3
14
      transaction.state.log.replication.factor: 3
15
      transaction.state.log.min.isr: 2
16
      log.message.format.version: "2.3"
17
    storage:
18
      type: jbod
19
      volumes:
20
      - id: 0
21
        type: persistent-claim
22
        size: 3Gi
23
        deleteClaim: false
24
  zookeeper:
25
    replicas: 3
26
    storage:
27
      type: persistent-claim
28
      size: 1Gi
29
      deleteClaim: false
30
  entityOperator:
31
    topicOperator: {}
32
    userOperator: {}



The above CRD  will create 3 replicas for both Zookeeper and Kafka, data will also be replicated 3 times across the Kafka cluster nodes, where each created PV is backing up the Kafka commit log. Those PVs will be created automatically in the gluster block storage class as this is the default one.

Java
 




xxxxxxxxxx
1
11


 
1
oc get pods
2
NAME                                          READY   STATUS    RESTARTS   AGE
3
my-cluster-entity-operator-7974b77699-95gs4   3/3     Running   0          4m
4
my-cluster-kafka-0                            2/2     Running   0          6m
5
my-cluster-kafka-1                            2/2     Running   0          6m
6
my-cluster-kafka-2                            2/2     Running   1          6m
7
my-cluster-zookeeper-0                        2/2     Running   0          6m
8
my-cluster-zookeeper-1                        2/2     Running   0          6m
9
my-cluster-zookeeper-2                        2/2     Running   0          6m
10
strimzi-cluster-operator-5bf8d947f9-kc6jv     1/1     Running   0          7m
11
 
          



 Now verify the PVs.

Java
 




xxxxxxxxxx
1


1
oc get pv -n glusterfs
2
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                               STORAGECLASS              REASON   AGE
3
pvc-4b338dfb-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-1   glusterfs-storage-block            12m
4
pvc-4b353de6-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-2   glusterfs-storage-block            12m
5
pvc-4b3b2993-cbdf-11ea-b116-fa163eb9ef7e   1Gi        RWO            Delete           Bound    kafka/data-my-cluster-zookeeper-0   glusterfs-storage-block            12m
6
pvc-6c222269-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-0     glusterfs-storage-block            12m
7
pvc-6c22c2fd-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-2     glusterfs-storage-block            11m
8
pvc-6c23246e-cbdf-11ea-b116-fa163eb9ef7e   3Gi        RWO            Delete           Bound    kafka/data-0-my-cluster-kafka-1     glusterfs-storage-block            12m
9
 
          



Now create a Kafka Topic 

Java
 




xxxxxxxxxx
1


 
1
oc exec -it my-cluster-kafka-0 -c kafka -- bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
2
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
3
Created topic test.
4
 
          
5
oc get kt
6
NAME   PARTITIONS   REPLICATION FACTOR
7
test   3            3
8
  



Set up Kafka Bridge

Shell
 




xxxxxxxxxx
1
15


1
apiVersion: kafka.strimzi.io/v1alpha1
2
kind: KafkaBridge
3
metadata:
4
  name: my-bridge
5
spec:
6
  replicas: 1
7
  bootstrapServers: my-cluster-kafka-bootstrap:9092
8
  http:
9
    port: 8080
10
  consumer:
11
    config:
12
      auto.offset.reset: earliest
13
  producer:
14
    config:
15
      delivery.timeout.ms: 300000



The above CRD will create Kafka bridge 

Shell
 




xxxxxxxxxx
1


 
1
oc get all | grep bridge
2
pod/my-bridge-bridge-d847cb4b7-qkchv              1/1     Running   0          1m
3
service/my-bridge-bridge-service      ClusterIP   172.30.107.103   <none>        8080/TCP                     1m
4
deployment.apps/my-bridge-bridge             1         1         1            1           1m
5
replicaset.apps/my-bridge-bridge-d847cb4b7              1         1         1       1m
6
 
          



Clients for the Kafka Bridge

Internal Clients
Internal clients are container-based HTTP clients running in the same Open Shift cluster as the Kafka Bridge itself. Internal clients can access the Kafka Bridge on the host and port defined in the Kafka Bridge custom resource.

External Clients

External clients are HTTP clients running outside the Open Shift cluster in which the Kafka Bridge is deployed and running. External clients can access the Kafka Bridge through an Open Shift Route, a loadbalancer service, or using an Ingress.

Now create a route for 'service/my-bridge-bridge-service '  so that external clients can access

Java
 




x


1
oc expose service/my-bridge-bridge-service 
2
route.route.openshift.io/my-bridge-bridge-service exposed
3
 
          
4
 oc get routes
5
NAME                       HOST/PORT                                                                     PATH   SERVICES                   PORT       TERMINATION   WILDCARD
6
my-bridge-bridge-service   my-bridge-bridge-service-kafka.apps.xxx.redhat.com          my-bridge-bridge-service   rest-api                 None
7
 
          



Producing Endpoints

The bridge exposes two main REST endpoints to send messages:

  • /topics/{topicname}
  • /topics/{topicname}/partitions/{partitionid}

The first one is used to send a message to a topic topicname while the second one allows the user to specify the partition via partitionid. Even using the first endpoint the user can specify the destination partition in the body of the message.

To send a message, a producer has to connect to the bridge using an HTTP POST request to the specific endpoint with a JSON payload containing the value and optionally the key and partition.

  • the first one has key and value, so the bridge will send it to the partition based on the hash of the key
  • the second one has the specified destination partition and the value
  • the third one just has the value, so the bridge will apply a round-robin mechanism to determine the partition
Java
 




x


 
1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/topics/test -H 'content-type:application/vnd.kafka.json.v2+json'    -d '{
2
>     "records": [
3
>         {
4
>             "key": "key-1",
5
>             "value": "value-1"
6
>         },
7
>         {
8
>             "key": "key-2",
9
>             "value": "value-2"
10
>         }
11
>     ]
12
> }'
13
  
14
{"offsets":[{"partition":1,"offset":0},{"partition":2,"offset":0}]}



Consuming Endpoints

From a consumer perspective, the bridge is much more complex due to the nature of how consuming messages from Apache Kafka works about consumer groups and partition rebalancing. For this reason, before subscribing to topics and starting to receive messages, an HTTP client has to “create” a corresponding consumer on the bridge which also means joining a consumer group. This happens through an HTTP POST on the following endpoint and providing a consumer configuration in the JSON payload.

Java
 




xxxxxxxxxx
1


1
[kkakarla@kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/consumers/my-group \
2
>   -H 'content-type: application/vnd.kafka.v2+json' \
3
>   -d '{
4
>     "name": "my-consumer",
5
>     "format": "json",
6
>     "auto.offset.reset": "earliest",
7
>     "enable.auto.commit": false
8
>   }'
9
{"instance_id":"my-consumer","base_uri":"http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer"}



The bridge created a new consumer in the group and returns to the client so-called base_uri which is the URL that the client has to use for sending the subsequent requests (i.e. subscribe, polling, …).

The HTTP consumer will interact with the following endpoints for subscribing to topics, getting messages, committing offsets, and finally deleting the consumer.

  • /consumers/{groupid}/instances/{name}/subscription
  • /consumers/{groupid}/instances/{name}/records
  • /consumers/{groupid}/instances/{name}/offsets
  • /consumers/{groupid}/instances/{name}

Subscribe

Java
 




xxxxxxxxxx
1


 
1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/subscription \
2
  -H 'content-type: application/vnd.kafka.v2+json' \
3
  -d '{
4
    "topics": [
5
        "test"
6
    ]
7
}'



Consume

As a native Apache Kafka client, getting messages means doing a “poll” operation which in terms of HTTP protocol means doing HTTP GET requests on the relevant endpoints; the bridge will return an array of records with the topic, key, value, partition and offset.

Java
 




xxxxxxxxxx
1


1
[kkakarla@kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X GET http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/myonsumer/records \
2
>   -H 'accept: application/vnd.kafka.json.v2+json'
3
[{"topic":"test","key":"key-2","value":"value-2","partition":2,"offset":0}]



After consuming messages, if the auto-commit feature is not enabled on consumer creation, it is necessary to commit the offsets via an HTTP POST request specifying an offsets collection with topic, partition, and required offset to commit.

Java
 




xxxxxxxxxx
1


1
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/offsets
2



The bridge also exposes endpoints for seeking into a topic partition at the beginning, at the end, or a specific offset.

  • /consumers/{groupid}/instances/{name}/positions
  • /consumers/{groupid}/instances/{name}/positions/beginning
  • /consumers/{groupid}/instances/{name}/positions/end

To seek to a specific position in the partition, the consumer must provide offset information through the JSON payload in the HTTP POST request. The format is the same as used to commit the offset.

Conclusion

Exposing the Apache Kafka cluster to clients using HTTP enables scenarios where the use of native clients is not desirable. Such situations include resource-constrained devices, network availability, and security considerations. Interaction with the bridge is similar to the native Apache Kafka clients but using the semantics of an HTTP REST API. The inclusion of the HTTP Bridge in Strimzi enhances the options available to developers when building applications with Apache Kafka.

Topics:
apache kafka, apache zookeeper, kafka, kafka architecture, kafka cluster, red hat

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}