Running Kafka in Kubernetes With Kraft Mode and SSL
In this article, learn how to launch an Apache Kafka with the Apache Kafka Raft (KRaft) consensus protocol and SSL encryption.
Join the DZone community and get the full member experience.
Join For FreePrerequisites
An understanding of Apache Kafka, Kubernetes, and Minikube.
The following steps were initially taken on a MacBook Pro with 32GB memory running MacOS Ventura v13.4.
Make sure to have the following applications installed:
- Docker v23.0.5
- Minikube v1.29.0 (running K8s v1.26.1 internally)
It's possible the steps below will work with different versions of the above tools, but if you run into unexpected issues, you'll want to ensure you have identical versions. Minikube was chosen for this exercise due to its focus on local development.
Deployment Components
Server Keys and Certificates
The first step to enable SSL encryption is to a create public/private key pair for every server.
ā ļø The commands in this section were executed in a Docker container running the image openjdk:11.0.10-jre
because it's the same Java version (Java 11) that Confluent runs. With this approach, any possible Java version-related issue is prevented.
The next commands were executed following the Confluent Security Tutorial:
docker run -it --rm \
--name openjdk \
--mount source=kafka-certs,target=/app \
openjdk:11.0.10-jre
Once in the Docker container:
keytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -keyalg RSA -genkey
Output:
Enter keystore password: Re-enter new password: What is your first and last name? [Unknown]: kafka-0.kafka-headless.kafka.svc.cluster.local What is the name of your organizational unit? [Unknown]: test What is the name of your organization? [Unknown]: test What is the name of your City or Locality? [Unknown]: Liverpool What is the name of your State or Province? [Unknown]: Merseyside What is the two-letter country code for this unit? [Unknown]: UK Is CN=kafka-0.kafka-headless.kafka.svc.cluster.local, OU=test, O=test, L=Liverpool, ST=Merseyside, C=UK correct? [no]: yes
Repeating the command for each broker:
keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -keyalg RSA -genkey
keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -keyalg RSA -genkey
Create Your Own Certificate Authority (CA)
Generate a CA that is simply a public-private key pair and certificate, and it is intended to sign other certificates.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 90
Output:
Generating a RSA private key ...+++++ ........+++++ writing new private key to 'ca-key' Enter PEM pass phrase: Verifying - Enter PEM pass phrase: ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [AU]:UK State or Province Name (full name) [Some-State]:Merseyside Locality Name (eg, city) []:Liverpool Organization Name (eg, company) [Internet Widgits Pty Ltd]:test Organizational Unit Name (eg, section) []:test Common Name (e.g. server FQDN or YOUR name) []:*.kafka-headless.kafka.svc.cluster.local Email Address []:
Add the generated CA to the clientsā truststore so that the clients can trust this CA:
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
Add the generated CA to the brokersā truststore so that the brokers can trust this CA.
keytool -keystore kafka-0.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka-1.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka-2.server.truststore.jks -alias CARoot -importcert -file ca-cert
Sign the Certificate
To sign all certificates in the keystore with the CA that you generated:
Export the certificate from the keystore:
Shell31keytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -certreq -file cert-file-kafka-0
2keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -certreq -file cert-file-kafka-1
3keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -certreq -file cert-file-kafka-2
- Sign it with the CA:
Shell31
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-0 -out cert-signed-kafka-0 -days 90 -CAcreateserial -passin pass:${ca-password}
2openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-1 -out cert-signed-kafka-1 -days 90 -CAcreateserial -passin pass:${ca-password}
3openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-kafka-2 -out cert-signed-kafka-2 -days 90 -CAcreateserial -passin pass:${ca-password}
ā ļø Don't forget to substitute
${ca-password}
Import both the certificate of the CA and the signed certificate into the broker keystore:
Shell81keytool -keystore kafka-0.server.keystore.jks -alias CARoot -importcert -file ca-cert
2keytool -keystore kafka-0.server.keystore.jks -alias kafka-0 -importcert -file cert-signed-kafka-0
3ā
4keytool -keystore kafka-1.server.keystore.jks -alias CARoot -importcert -file ca-cert
5keytool -keystore kafka-1.server.keystore.jks -alias kafka-1 -importcert -file cert-signed-kafka-1
6ā
7keytool -keystore kafka-2.server.keystore.jks -alias CARoot -importcert -file ca-cert
8keytool -keystore kafka-2.server.keystore.jks -alias kafka-2 -importcert -file cert-signed-kafka-2
ā ļø The
keystore
andtruststore
files will be used to create the ConfigMap for our deployment.
ConfigMaps
Create two ConfigMaps, one for the Kafka Broker and another one for our Kafka Client.
Kafka Broker
Create a local folder kafka-ssl
and copy the keystore
and truststore
files into the folder. In addition, create a file broker_creds
with the ${ca-password}
.
Your folder should look similar to this:
ls kafka-ssl
broker_creds
kafka-0.server.truststore.jks kafka-1.server.truststore.jks kafka-2.server.truststore.jks
kafka-0.server.keystore.jks kafka-1.server.keystore.jks kafka-2.server.keystore.jks
Create the ConfigMap:
kubectl create configmap kafka-ssl --from-file kafka-ssl -n kafka
kubectl describe configmaps -n kafka kafka-ssl
Output:
Name: kafka-ssl
Namespace: kafka
Labels: <none>
Annotations: <none>
ā
Data
====
broker_creds:
----
<redacted>
ā
ā
BinaryData
====
kafka-0.server.keystore.jks: 5001 bytes
kafka-0.server.truststore.jks: 1306 bytes
kafka-1.server.keystore.jks: 5001 bytes
kafka-1.server.truststore.jks: 1306 bytes
kafka-2.server.keystore.jks: 5001 bytes
kafka-2.server.truststore.jks: 1306 bytes
ā
Events: <none>
Kafka Client
Create a local folder kafka-client
and copy the kafka.client.truststore.jks
file into the folder. In addition, create a file broker_creds
with the ${ca-password}
and a file client_security.properties
.
#client_security.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=<redacted>
Your folder should look similar to this:
ls kafka-client
broker_creds client_security.properties kafka.client.truststore.jks
Create the ConfigMap:
kubectl create configmap kafka-client --from-file kafka-client -n kafka
kubectl describe configmaps -n kafka kafka-client
Output:
Name: kafka-client
Namespace: kafka
Labels: <none>
Annotations: <none>
ā
Data
====
broker_creds:
----
<redacted>
ā
client_security.properties:
----
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.endpoint.identification.algorithm=
ā
BinaryData
====
kafka.client.truststore.jks: 1306 bytes
ā
Events: <none>
Confluent Kafka
This yaml file deploys a Kafka cluster within a Kubernetes namespace named kafka
. It defines various Kubernetes resources required for setting up Kafka in a distributed manner.
---
apiVersion v1
kind ServiceAccount
metadata
name kafka
namespace kafka
---
apiVersion v1
kind Service
metadata
labels
app kafka
name kafka-headless
namespace kafka
spec
clusterIP None
clusterIPs
None
internalTrafficPolicy Cluster
ipFamilies
IPv4
ipFamilyPolicy SingleStack
ports
name tcp-kafka-int
port9092
protocol TCP
targetPort tcp-kafka-int
name tcp-kafka-ssl
port9093
protocol TCP
targetPort tcp-kafka-ssl
selector
app kafka
sessionAffinity None
type ClusterIP
---
apiVersion apps/v1
kind StatefulSet
metadata
labels
app kafka
name kafka
namespace kafka
spec
podManagementPolicy Parallel
replicas3
revisionHistoryLimit10
selector
matchLabels
app kafka
serviceName kafka-headless
template
metadata
labels
app kafka
spec
serviceAccountName kafka
containers
command
sh
-exc
|
export KAFKA_NODE_ID=$ HOSTNAME##*- && \
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$ POD_NAME .kafka-headless.kafka.svc.cluster.local:9092,SSL://$ POD_NAME .kafka-headless.kafka.svc.cluster.local:9093
export KAFKA_SSL_TRUSTSTORE_FILENAME=$ POD_NAME .server.truststore.jks
export KAFKA_SSL_KEYSTORE_FILENAME=$ POD_NAME .server.keystore.jks
export KAFKA_OPTS="-Djavax.net.debug=all"
ā
exec /etc/confluent/docker/run
env
name KAFKA_SSL_KEY_CREDENTIALS
value"broker_creds"
name KAFKA_SSL_KEYSTORE_CREDENTIALS
value"broker_creds"
name KAFKA_SSL_TRUSTSTORE_CREDENTIALS
value"broker_creds"
name KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL"
name CLUSTER_ID
value"6PMpHYL9QkeyXRj9Nrp4KA"
name KAFKA_CONTROLLER_QUORUM_VOTERS
value"0@kafka-0.kafka-headless.kafka.svc.cluster.local:29093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:29093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:29093"
name KAFKA_PROCESS_ROLES
value"broker,controller"
name KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value"3"
name KAFKA_NUM_PARTITIONS
value"3"
name KAFKA_DEFAULT_REPLICATION_FACTOR
value"3"
name KAFKA_MIN_INSYNC_REPLICAS
value"2"
name KAFKA_CONTROLLER_LISTENER_NAMES
value"CONTROLLER"
name KAFKA_LISTENERS
value PLAINTEXT //0.0.0.0 9092,CONTROLLER //0.0.0.0 29093,SSL //0.0.0.09093
name POD_NAME
valueFrom
fieldRef
apiVersion v1
fieldPath metadata.name
name kafka
image docker.io/confluentinc/cp-kafka7.5.0
imagePullPolicy IfNotPresent
livenessProbe
failureThreshold6
initialDelaySeconds60
periodSeconds60
successThreshold1
tcpSocket
port tcp-kafka-int
timeoutSeconds5
ports
containerPort9092
name tcp-kafka-int
protocol TCP
containerPort29093
name tcp-kafka-ctrl
protocol TCP
containerPort9093
name tcp-kafka-ssl
protocol TCP
resources
limits
cpu"1"
memory 1400Mi
requests
cpu 250m
memory 512Mi
securityContext
allowPrivilegeEscalationfalse
capabilities
drop
ALL
runAsGroup1000
runAsUser1000
terminationMessagePath /dev/termination-log
terminationMessagePolicy File
volumeMounts
mountPath /etc/kafka/secrets/
name kafka-ssl
mountPath /etc/kafka
name config
mountPath /var/lib/kafka/data
name data
mountPath /var/log
name logs
dnsPolicy ClusterFirst
restartPolicy Always
schedulerName default-scheduler
securityContext
fsGroup1000
terminationGracePeriodSeconds30
volumes
emptyDir
name config
emptyDir
name logs
name kafka-ssl
configMap
name kafka-ssl
updateStrategy
type RollingUpdate
volumeClaimTemplates
apiVersion v1
kind PersistentVolumeClaim
metadata
name data
spec
accessModes
ReadWriteOnce
resources
requests
storage 10Gi
storageClassName standard
volumeMode Filesystem
status
phase Pending
The deployment we will create will have the following components:
- Namespace: kafka This is the namespace within which all components will be scoped.
- Service Account: kafka Service accounts are used to control permissions and access to resources within the cluster.
- Headless Service: kafka-headless It exposes ports 9092 (for PLAINTEXT communication) and 9093 (for SSL traffic).
- StatefulSet: kafka It manages Kafka pods and ensures they have stable hostnames and storage.
The source code for this deployment can be found in this GitHub repository.
Specifically for the SSL configurations, the next parameters were configured in the StatefulSet:
- Configure the truststore, keystore, and password:
KAFKA_SSL_KEY_CREDENTIALS KAFKA_SSL_KEYSTORE_CREDENTIALS KAFKA_SSL_TRUSTSTORE_CREDENTIALS
- Configure the ports for the Kafka brokers to listen for SSL:
KAFKA_ADVERTISED_LISTENERS KAFKA_LISTENER_SECURITY_PROTOCOL_MAP KAFKA_LISTENERS
Creating the Deployment
Clone the repo:git clone https://github.com/rafaelmnatali/kafka-k8s.git cd ssl
Deploy Kafka using the following commands:
kubectl apply -f 00-namespace.yaml kubectl apply -f 01-kafka-local.yaml
Verify Communication Across Brokers
There should now be three Kafka brokers each running on separate pods within your cluster. Name resolution for the headless service and the three pods within the StatefulSet is automatically configured by Kubernetes as they are created,allowing for communication across brokers. See the related documentation for more details on this feature.
You can check the first pod's logs with the following command:
kubectl logs kafka-0
The name resolution of the three pods can take more time to work than it takes the pods to start, so you may see UnknownHostException warnings in the pod logs initially:
WARN [RaftManager nodeId=2] Error connecting to node kafka-1.kafka-headless.kafka.svc.cluster.local:29093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: kafka-1.kafka-headless.kafka.svc.cluster.local ...
But eventually each pod will successfully resolve pod hostnames and end with a message stating the broker has been unfenced:
INFO [Controller 0] Unfenced broker: UnfenceBrokerRecord(id=1, epoch=176) (org.apache.kafka.controller.ClusterControlManager)
Create a Topic Using the SSL Endpoint
The Kafka StatefulSet should now be up and running successfully. Now we can create a topic using the SSL endpoint.
You can deploy Kafka Client using the following command:
kubectl apply -f 02-kafka-client.yaml
Check if the Pod is Running:
kubectl get pods
Output:
NAME READY STATUS RESTARTS AGE kafka-cli 1/1 Running 0 12m
Connect to the pod kafka-cli:
kubectl exec -it kafka-cli -- bash
Create a topic named test-ssl with three partitions and a replication factor of 3.
kafka-topics --create --topic test-ssl --partitions 3 --replication-factor 3 --bootstrap-server ${BOOTSTRAP_SERVER} --command-config /etc/kafka/secrets/client_security.properties Created topic test-ssl.
The environment variable BOOTSTRAP_SERVER
contains the list of the brokers, therefore, we save time in typing.
List all the topics in Kafka:
kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9093 --list --command-config /etc/kafka/secrets/client_security.properties test test-ssl test-test
Summary and Next Steps
Published at DZone with permission of Rafael Natali. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments