{{announcement.body}}
{{announcement.title}}

Kafka on Kubernetes, the Strimzi Way (Part 3)

DZone 's Guide to

Kafka on Kubernetes, the Strimzi Way (Part 3)

Use the Strimzi operator to run Kafka on Kubernetes.

· Big Data Zone ·
Free Resource

Over the course of the first two parts of this blog series, we setup a single-node Kafka cluster on Kubernetes, secured it using TLS encryption and accessed the broker using both internal and external clients. Let's keep iterating! In this post, we will continue the Kafka on Kubernetes journey with Strimzi and cover:

  • How to apply different authentication types: TLS and SASL SCRAM-SHA-512
  • Use Strimzi Entity operator to manage Kafka users and topics
  • How to configure Kafka CLI and Go client applications to securely connect to the Kafka cluster

The code is available on GitHub - https://github.com/abhirockzz/kafka-kubernetes-strimzi/

What Do I Need to Go Through This Tutorial?

kubectl - https://kubernetes.io/docs/tasks/tools/install-kubectl/

I will be using Azure Kubernetes Service (AKS) to demonstrate the concepts, but by and large it is independent of the Kubernetes provider (e.g. feel free to use a local setup such as minikube). If you want to use AKS, all you need is a Microsoft Azure account which you can get for FREE if you don't have one already.

I will not be repeating some of the common sections (such as Installation/Setup (Helm, Strimzi, Azure Kubernetes Service), Strimzi overview) in this or subsequent part of this series and would request you to refer to part one.

Create a Kafka Cluster With TLS Authentication

To enforce 2-way mutual TLS auth, all we need to do is tweak the Strimzi Kafka resource. I am highlighting the key part below. The other parts remain the same (here is the manifest from part 2) i.e. single node Kafka and Zookeeper, ephemeral storage along with TLS encryption

Java
 




x


 
1
      external:
2
        type: loadbalancer
3
        tls: true
4
        authentication:
5
          type: tls



All we did is all the tls authentication type as a property of the external listener. In addition to this, we also include the entityOperator configuration as such:

Java
 




xxxxxxxxxx
1


 
1
  entityOperator:
2
    userOperator: {}
3
    topicOperator: {}



This activates the Strimzi Entity Operator which in turn comprises of the Topic Operator and User Operator. Just as the Kafka CRD allows you to control Kafka clusters on Kubernetes, a Topic Operator allows you to manage topics in a Kafka cluster through a custom resource called KafkaTopic i.e. you can create, delete and update topics in your Kafka cluster.

The interesting part is that it's a two-way sync i.e. you can still create topics by accessing the Kafka cluster directly and it would reflect in the KafkaTopic resources being created/updated/deleted

The goal of the User Operator is to make Kafka user management easier with help of a KafkaUser CRD. All you do is create instances of KafkaUser CRDs and Strimzi takes care of the Kafka specific user management parts

Unlike Topic Operator, this is not a two-way sync

Read more about Entity Operator here https://strimzi.io/docs/operators/master/using.html#assembly-kafka-entity-operator-deployment-configuration-kafka

We will dive into the practical bit of these two operators in upcoming sections.

To create the Kafka cluster:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml



What Did the Strimzi Operator Do for Us in This Case?

We covered most of these in part 1 - StatefulSet (and Pods), LoadBalancer Service, ConfigMap, Secret etc. How is the TLS auth config enforced? To figure that out, let's introspect the Kafka server configuration

As explained in part 1, this is stored in a ConfigMap

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml



Look at the External listener section in server.config:

Java
 




xxxxxxxxxx
1


 
1
    listener.name.external-9094.ssl.client.auth=required
2
    listener.name.external-9094.ssl.truststore.location=/tmp/kafka/clients.truststore.p12
3
    listener.name.external-9094.ssl.truststore.password=${CERTS_STORE_PASSWORD}
4
    listener.name.external-9094.ssl.truststore.type=PKCS12


The snippet highlighted above is the part which was added - notice listener.name.external-9094.ssl.client.auth=required was added along with the truststore details.

Let's Not Forget the Entity Operator

The Entity Operator runs a separate Deployment

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
kubectl get deployment $CLUSTER_NAME-entity-operator
3
kubectl get pod -l=app.kubernetes.io/name=entity-operator
4
 
          
5
NAME                                                READY   STATUS     
6
my-kafka-cluster-entity-operator-666f8758f6-gj54h   3/3     Running   



The entity operator Pod runs three containers - topic-operator, user-operator, tls-sidecar

We have configured our cluster to authenticate client connections, but what about the user credentials which will be used by client apps?

Time to Use the User Operator!

The User Operator allows us to create KafkaUsers to represent client authentication credentials. As mentioned in the beginning of the blog post, supported authentication types include TLS and SCRAM-SHA-512. Behind the scenes, a Kubernetes Secret is created by Strimzi to store the credentials

OAuth 2.0 is also supported but its not handled by the User Operator

Let's create a KafkaUser to store client credentials for TLS auth. Here is what the user info looks like:

Java
 




xxxxxxxxxx
1


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaUser
3
metadata:
4
  name: kafka-tls-client-credentials
5
  labels:
6
    strimzi.io/cluster: my-kafka-cluster
7
spec:
8
  authentication:
9
    type: tls



We name the user kafka-tls-client-credentials, associate with the Kafka cluster we created earlier (using the label strimzi.io/cluster: my-kafka-cluster) and specify the tls authentication type

You can also define authorization rules (not covered in this blog) within a KafkaUser definition - see https://strimzi.io/docs/operators/master/using.html#type-KafkaUser-reference

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-tls-auth.yaml



Introspect the Secret (it has the same name as the KafkaUser):

Java
 




xxxxxxxxxx
1


 
1
kubectl get secret/kafka-tls-client-credentials -o yaml



TLS Client Authentication

That's it! Now its up to the client to use the credentials. We will use a Kafka CLI and Go client application to try this out. First things first:

Extract and Configure the User Credentials

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_USER_NAME=kafka-tls-client-credentials
2
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.crt}' | base64 --decode > user.crt
3
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.key}' | base64 --decode > user.key
4
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.p12}' | base64 --decode > user.p12
5
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.password}' | base64 --decode > user.password



Import the entry in user.p12 into another keystore

Java
 




xxxxxxxxxx
1


 
1
export USER_P12_FILE_PATH=user.p12
2
export USER_KEY_PASSWORD_FILE_PATH=user.password
3
export KEYSTORE_NAME=kafka-auth-keystore.jks
4
export KEYSTORE_PASSWORD=foobar
5
export PASSWORD=`cat $USER_KEY_PASSWORD_FILE_PATH`
6
 
          
7
sudo keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12
8
 
          
9
sudo keytool -list -alias $KAFKA_USER_NAME -keystore $KEYSTORE_NAME



Just like we did in part 2, TLS encryption config requires importing the cluster CA cert in the client truststore

Extract and Configure Server CA Cert

Extract the cluster CA certificate and password

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
 
          
3
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
4
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password



Import it into truststore - I am using the built-in truststore which comes in with a JDK (Java) installation - but this is just for convenience and you're free to use other truststore

Java
 




xxxxxxxxxx
1
14


 
1
export CERT_FILE_PATH=ca.crt
2
export CERT_PASSWORD_FILE_PATH=ca.password
3
 
          
4
# replace this with the path to your truststore
5
 
          
6
export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
7
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
8
 
          
9
# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
10
# Type yes in response to the 'Trust this certificate? [no]:' prompt
11
 
          
12
sudo keytool -importcert -alias strimzi-kafka-cert -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD
13
 
          
14
sudo keytool -list -alias strimzi-kafka-cert -keystore $KEYSTORE_LOCATION



You should now be able to authenticate to the Kafka cluster using the Kafka CLI client

Please note that the configuration steps for the Kafka CLI as detailed below will also work for the Java clients as well - feel free to try that out as well

Create Properties File for Kafka CLI Clients

Extract the LoadBalancer public IP for Kafka cluster

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_CLUSTER_NAME=my-kafka-cluster
2
 
          
3
kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}



Create a file called client-ssl-auth.properties with the following contents:

Java
 




xxxxxxxxxx
1


 
1
bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
2
security.protocol=SSL
3
ssl.truststore.location=[TRUSTSTORE_LOCATION]
4
ssl.truststore.password=changeit
5
ssl.keystore.location=kafka-auth-keystore.jks
6
ssl.keystore.password=foobar
7
ssl.key.password=[contents of user.password file]


changeit is the default truststore password. Please use a different one if needed

Download Kafka if you don't have it already - https://kafka.apache.org/downloads

One last thing before you proceed

Create a KafkaTopic

As I mentioned earlier, the Topic Operator makes this possible to embed topic info in form of a KafkaTopic manifest as such:

Java
 




xxxxxxxxxx
1


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaTopic
3
metadata:
4
  name: strimzi-test-topic
5
  labels:
6
    strimzi.io/cluster: my-kafka-cluster
7
spec:
8
  partitions: 3
9
  replicas: 1



To create the topic:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/topic.yaml


Here is the reference for a KafkaTopic CRD https://strimzi.io/docs/operators/master/using.html#type-KafkaTopic-reference

All you need to do is use the kafka-console-producer and kafka-console-consumer by pointing it to the client-ssl-auth.properties file you just created

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_HOME=[replace with kafka installation] e.g. /Users/foobar/kafka_2.12-2.3.0
2
export LOADBALANCER_PUBLIC_IP=[replace with public-ip]
3
export TOPIC_NAME=strimzi-test-topic
4
 
          
5
# on a terminal, start producer and send a few messages
6
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl-auth.properties
7
 
          
8
# on another terminal, start consumer
9
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl-auth.properties --from-beginning



You should see producer and consumer working in tandem. Great!

If you face SSL Handshake errors, please check whether keys and certificates has been correctly imported and you're using the correct password. If the Kafka cluster is not reachable, ensure you are using the right value for the public IP

Now, let's try a programmatic client. Since the Java client behavior (required config properties) are same as the CLI, I am using a Go client to try something different. Don't worry, if you are not a Go programmer, it should be easy to follow along.

I will not walk through the entire program, just the part where we create the connection related configuration. Here is the snippet:

Java
 




xxxxxxxxxx
1


 
1
    bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
2
    caLocation = os.Getenv("CA_CERT_LOCATION")
3
    topic = os.Getenv("KAFKA_TOPIC")
4
 
          
5
    userCertLocation = os.Getenv("USER_CERT_LOCATION")
6
    userKeyLocation = os.Getenv("USER_KEY_LOCATION")
7
    userKeyPassword = os.Getenv("USER_KEY_PASSWORD")
8
 
          
9
    producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation, "ssl.certificate.location": userCertLocation, "ssl.key.location": userKeyLocation, "ssl.key.password": userKeyPassword}



Notice that the bootstrap.servers and security.protocol are the same as ones you used in the Kafka CLI client (same for Java as well).

  • For TLS encryption: ssl.ca.location is used to point to the CA certificate directly as opposed to a truststore
  • For client authentication: ssl.certificate.location, ssl.key.location and ssl.key.password refer to the user certificate, user key and password respectively

If you have Go installed, you can try it out. Clone the Git repo

Java
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
2
cd part-3/go-client-app



...and run the program:

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094] e.g. 20.43.176.7:9094
2
export CA_CERT_LOCATION=[replace with location of ca.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
3
export KAFKA_TOPIC=test-strimzi-topic
4
 
          
5
export USER_CERT_LOCATION=[path to user.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.crt
6
export USER_KEY_LOCATION=[path to user.key file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.key
7
export USER_KEY_PASSWORD=[contents of user.password file]
8
 
          
9
go run kafka-tls-auth-client.go



The logs should confirm whether messages are being produced and consumed

Enforce SCRAM-SHA-512 Auth

SCRAM stands for "Salted Challenge Response Authentication Mechanism". I will not pretend to be a security or SCRAM expert, but do want to highlight that it is one of the supported and commonly used authentication mechanism in Kafka (in addition to other such as PLAIN)

Please note that Strimzi does not support SASL PLAIN auth at the time of writing

Update the Kafka Cluster

To apply the SCRAM authentication scheme - all you need is to set the authentication.type to scram-sha-512

Java
 




xxxxxxxxxx
1


 
1
      external:
2
        type: loadbalancer
3
        tls: true
4
        authentication:
5
          type: scram-sha-512



Update the Kafka cluster to use SCRAM-SHA authentication

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml



Let's take a look at how the Kafka server config looks like in this case:

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml



Introspect External listener section in server.config and notice how the the config has been updated to reflect

Java
 




xxxxxxxxxx
1


 
1
    listener.name.external-9094.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
2
    listener.name.external-9094.sasl.enabled.mechanisms=SCRAM-SHA-512



Create SCRAM Credentials (KafkaUser)

Just like we did with TLS auth, we need to create client credentials for SCRAM as well. It only differs from its TLS equivalent in terms of name and the type (of course!)

Java
 




xxxxxxxxxx
1


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaUser
3
metadata:
4
  name: kafka-scram-client-credentials
5
  labels:
6
    strimzi.io/cluster: my-kafka-cluster
7
spec:
8
  authentication:
9
    type: scram-sha-512


notice that authentication.type is scram-sha-512

Create the KafkaUser

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-scram-auth.yaml



Introspect the Secret (it has the same name as the KafkaUser):

Java
 




xxxxxxxxxx
1


 
1
kubectl get secret/kafka-scram-client-credentials -o yaml



The Secret contains the password in base64 encoded form

Java
 




xxxxxxxxxx
1


 
1
apiVersion: v1
2
kind: Secret
3
name: kafka-scram-client-credentials
4
data:
5
  password: SnpteEQwek1DNkdi
6
...


Username is same as the KafkaUser/Secret name, which is kafka-scram-client-credentials in this example

Run Client Applications

In order run the client examples, download the the password:

Java
 




xxxxxxxxxx
1


 
1
export USER_NAME=kafka-scram-client-credentials
2
kubectl get secret $USER_NAME -o jsonpath='{.data.password}' | base64 --decode > user-scram.password



To test the Kafka CLI client, create a file client-scram-auth.properties with the following contents:

Java
 




xxxxxxxxxx
1


 
1
bootstrap.servers=[replace with public-ip:9094]
2
security.protocol=SASL_SSL
3
sasl.mechanism=SCRAM-SHA-512
4
ssl.truststore.location=[replace with path to truststore with kafka CA cert]
5
# "changeit" is the default password for JDK truststore, please use the one applicable to yours
6
ssl.truststore.password=changeit
7
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-scram-client-credentials" password="[replace with contents of user-scram.password file]";



Refer to the instructions above to run the console producer and consumer

please make sure you use the client-scram-auth.properties and not the client-tls-auth.properties file

Before wrapping up, lets look at the Go client and see how it handles SCRAM authentication. As always, I will only highlight the part which showcases the configuration:

Java
 




xxxxxxxxxx
1


 
1
    bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
2
    caLocation = os.Getenv("CA_CERT_LOCATION")
3
    topic = os.Getenv("KAFKA_TOPIC")
4
 
          
5
    kafkaScramUsername = os.Getenv("SCRAM_USERNAME")
6
    kafkaScramPassword = os.Getenv("SCRAM_PASSWORD")
7
 
          
8
    producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SASL_SSL", "ssl.ca.location": caLocation, "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": kafkaScramUsername, "sasl.password": kafkaScramPassword}



The security.protocol and sasl.mechanism have been updated to SASL_SSL and SCRAM-SHA-512 respectively. Along with that, we use the sasl.username and sasl.password to specify the client credentials

To run the Go client app:

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094]
2
export CA_CERT_LOCATION=[path to ca.crt file] f.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
3
export KAFKA_TOPIC=strimzi-test-topic
4
 
          
5
export SCRAM_USERNAME=kafka-scram-client-credentials
6
export SCRAM_PASSWORD=[contents of user-scram.password file]
7
 
          
8
go run kafka-scram-auth-client.go



Wrap Up...For Now

This post covered a decent amount of ground! We learnt how to apply different authentication types, use Entity Operators to manage Kafka users and topics and more importantly, understand how client applications need to configured to connect securely using a combination of TLS encryption and the chosen authentication scheme.

We're far from done! All this while, we've been creating ephemeral clusters with no persistence — we will fix that in upcoming posts.

Topics:
big data, cncf, docker, kafka, kubernetes, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}