Setting up Kafka Cluster With Gluster-Block Storage
Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka.
Join the DZone community and get the full member experience.
Join For FreeRed Hat AMQ Streams
Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.
Kafka Bridge
AMQ Streams Kafka Bridge provides a Restful interface that allows HTTP-based clients to interact with a Kafka cluster. Kafka Bridge offers the advantages of a web API connection to AMQ Streams, without the need for client applications to interpret the Kafka protocol.
The API has two main resources — consumers
and topics
— that are exposed and made accessible through endpoints to interact with consumers and producers in your Kafka cluster. The resources relate only to the Kafka Bridge, not the consumers and producers connected directly to Kafka.
The bridge provides a REST API, described by an Open API specification, which exposes multiple endpoints to allow typical Apache Kafka operations:
- Sending messages to topics (including to a specific partition)
- Subscribing to one or more topics (even using a pattern) as part of a consumer group, or asking for a specific partition assignment
- Receiving messages from the subscribed topics
- Committing offsets related to the received messages
- Seeking to a specific position (or at the beginning/end) in a topic partition
The client behavior and the interaction with the Apache Kafka cluster, through the bridge, is the same which happens with a native Kafka client but with HTTP/1.1 protocol semantics.
Each endpoint allows specific HTTP methods (GET, POST, DELETE) to execute the above operations.
Prerequisites
For this demonstration, you will need the following technologies set up in your development environment:
- An Open Shift 3.11+ environment with Cluster Admin access
- Gluster block installed on openshift
- Open shift CLI (
oc
) - Apache Maven 3.6.3+
In this article, we demonstrate how to set up a persistent Kafka cluster with 3 Kafka brokers and 3 Kafka zookeepers with gluster-block storage and exposure through Kafka bridge.
Set Up Kafka Cluster
Download the example/install files from Red Hat Customer Portal or STRIMZI
AMQ Streams requires block storage and is designed to work optimally with cloud-based block storage solutions, including Amazon Elastic Block Store (EBS). The use of file storage is not recommended.
Choose local storage (local persistent volumes) when possible. If local storage is not available, you can use a Storage Area Network (SAN) accessed by a protocol such as Fibre Channel or iSCSI.
There is no need to provision replicated storage because Kafka and Zookeeper both have built-in data replication.
It is recommended that you configure your storage system to use the XFS file system. AMQ Streams is also compatible with the ext4 file system, but this might require additional configuration for the best results.
For this demonstration gluster-block installed on openshift.
Verify the storage class up and running, in this demo, we will use PVCs taken from Gluster-block to persist the data that is written to the Kafka commit log by each node
oc get sc
NAME PROVISIONER AGE
glusterfs-storage kubernetes.io/glusterfs 4d
glusterfs-storage-block (default) gluster.org/glusterblock-glusterfs 4d
So we have our glusterblock storage class set as the default, which means we can start consuming PVs. To deploy our Kafka cluster, we will use a Kafka Cluster CRD:
xxxxxxxxxx
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.3.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.3"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 3Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 1Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
The above CRD will create 3 replicas for both Zookeeper and Kafka, data will also be replicated 3 times across the Kafka cluster nodes, where each created PV is backing up the Kafka commit log. Those PVs will be created automatically in the gluster block storage class as this is the default one.
xxxxxxxxxx
oc get pods
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-7974b77699-95gs4 3/3 Running 0 4m
my-cluster-kafka-0 2/2 Running 0 6m
my-cluster-kafka-1 2/2 Running 0 6m
my-cluster-kafka-2 2/2 Running 1 6m
my-cluster-zookeeper-0 2/2 Running 0 6m
my-cluster-zookeeper-1 2/2 Running 0 6m
my-cluster-zookeeper-2 2/2 Running 0 6m
strimzi-cluster-operator-5bf8d947f9-kc6jv 1/1 Running 0 7m
Now verify the PVs.
xxxxxxxxxx
oc get pv -n glusterfs
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pvc-4b338dfb-cbdf-11ea-b116-fa163eb9ef7e 1Gi RWO Delete Bound kafka/data-my-cluster-zookeeper-1 glusterfs-storage-block 12m
pvc-4b353de6-cbdf-11ea-b116-fa163eb9ef7e 1Gi RWO Delete Bound kafka/data-my-cluster-zookeeper-2 glusterfs-storage-block 12m
pvc-4b3b2993-cbdf-11ea-b116-fa163eb9ef7e 1Gi RWO Delete Bound kafka/data-my-cluster-zookeeper-0 glusterfs-storage-block 12m
pvc-6c222269-cbdf-11ea-b116-fa163eb9ef7e 3Gi RWO Delete Bound kafka/data-0-my-cluster-kafka-0 glusterfs-storage-block 12m
pvc-6c22c2fd-cbdf-11ea-b116-fa163eb9ef7e 3Gi RWO Delete Bound kafka/data-0-my-cluster-kafka-2 glusterfs-storage-block 11m
pvc-6c23246e-cbdf-11ea-b116-fa163eb9ef7e 3Gi RWO Delete Bound kafka/data-0-my-cluster-kafka-1 glusterfs-storage-block 12m
Now create a Kafka Topic
xxxxxxxxxx
oc exec -it my-cluster-kafka-0 -c kafka -- bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic test.
oc get kt
NAME PARTITIONS REPLICATION FACTOR
test 3 3
Set up Kafka Bridge
xxxxxxxxxx
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaBridge
metadata:
name: my-bridge
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
http:
port: 8080
consumer:
config:
auto.offset.reset: earliest
producer:
config:
delivery.timeout.ms: 300000
The above CRD will create Kafka bridge
xxxxxxxxxx
oc get all | grep bridge
pod/my-bridge-bridge-d847cb4b7-qkchv 1/1 Running 0 1m
service/my-bridge-bridge-service ClusterIP 172.30.107.103 <none> 8080/TCP 1m
deployment.apps/my-bridge-bridge 1 1 1 1 1m
replicaset.apps/my-bridge-bridge-d847cb4b7 1 1 1 1m
Clients for the Kafka Bridge
Internal ClientsKafka Bridge
custom resource.
External Clients
Now create a route for 'service/my-bridge-bridge-service ' so that external clients can access
x
oc expose service/my-bridge-bridge-service
route.route.openshift.io/my-bridge-bridge-service exposed
oc get routes
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
my-bridge-bridge-service my-bridge-bridge-service-kafka.apps.xxx.redhat.com my-bridge-bridge-service rest-api None
Producing Endpoints
The bridge exposes two main REST endpoints to send messages:
/topics/{topicname}
/topics/{topicname}/partitions/{partitionid}
The first one is used to send a message to a topic topicname
while the second one allows the user to specify the partition via partitionid
. Even using the first endpoint the user can specify the destination partition in the body of the message.
To send a message, a producer has to connect to the bridge using an HTTP POST request to the specific endpoint with a JSON payload containing the value and optionally the key and partition.
- the first one has key and value, so the bridge will send it to the partition based on the hash of the key
- the second one has the specified destination partition and the value
- the third one just has the value, so the bridge will apply a round-robin mechanism to determine the partition
x
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/topics/test -H 'content-type:application/vnd.kafka.json.v2+json' -d '{
> "records": [
> {
> "key": "key-1",
> "value": "value-1"
> },
> {
> "key": "key-2",
> "value": "value-2"
> }
> ]
> }'
{"offsets":[{"partition":1,"offset":0},{"partition":2,"offset":0}]}
Consuming Endpoints
From a consumer perspective, the bridge is much more complex due to the nature of how consuming messages from Apache Kafka works about consumer groups and partition rebalancing. For this reason, before subscribing to topics and starting to receive messages, an HTTP client has to “create” a corresponding consumer on the bridge which also means joining a consumer group. This happens through an HTTP POST on the following endpoint and providing a consumer configuration in the JSON payload.
xxxxxxxxxx
[kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com/consumers/my-group \
> -H 'content-type: application/vnd.kafka.v2+json' \
> -d '{
> "name": "my-consumer",
> "format": "json",
> "auto.offset.reset": "earliest",
> "enable.auto.commit": false
> }'
{"instance_id":"my-consumer","base_uri":"http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer"}
The bridge created a new consumer in the group and returns to the client so-called base_uri
which is the URL that the client has to use for sending the subsequent requests (i.e. subscribe, polling, …).
The HTTP consumer will interact with the following endpoints for subscribing to topics, getting messages, committing offsets, and finally deleting the consumer.
/consumers/{groupid}/instances/{name}/subscription
/consumers/{groupid}/instances/{name}/records
/consumers/{groupid}/instances/{name}/offsets
/consumers/{groupid}/instances/{name}
Subscribe
xxxxxxxxxx
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/subscription \
-H 'content-type: application/vnd.kafka.v2+json' \
-d '{
"topics": [
"test"
]
}'
Consume
As a native Apache Kafka client, getting messages means doing a “poll” operation which in terms of HTTP protocol means doing HTTP GET requests on the relevant endpoints; the bridge will return an array of records with the topic, key, value, partition and offset.
xxxxxxxxxx
[kkakarla amq-streams-1.3.0-ocp-install-examples]$ curl -X GET http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/myonsumer/records \
> -H 'accept: application/vnd.kafka.json.v2+json'
[{"topic":"test","key":"key-2","value":"value-2","partition":2,"offset":0}]
After consuming messages, if the auto-commit feature is not enabled on consumer creation, it is necessary to commit the offsets via an HTTP POST request specifying an offsets collection with topic, partition, and required offset to commit.
xxxxxxxxxx
curl -X POST http://my-bridge-bridge-service-kafka.apps.xxx.redhat.com:80/consumers/my-group/instances/my-consumer/offsets
The bridge also exposes endpoints for seeking into a topic partition at the beginning, at the end, or a specific offset.
/consumers/{groupid}/instances/{name}/positions
/consumers/{groupid}/instances/{name}/positions/beginning
/consumers/{groupid}/instances/{name}/positions/end
To seek to a specific position in the partition, the consumer must provide offset information through the JSON payload in the HTTP POST request. The format is the same as used to commit the offset.
Conclusion
Exposing the Apache Kafka cluster to clients using HTTP enables scenarios where the use of native clients is not desirable. Such situations include resource-constrained devices, network availability, and security considerations. Interaction with the bridge is similar to the native Apache Kafka clients but using the semantics of an HTTP REST API. The inclusion of the HTTP Bridge in Strimzi enhances the options available to developers when building applications with Apache Kafka.
Opinions expressed by DZone contributors are their own.
Comments