{{announcement.body}}
{{announcement.title}}

Proper Kubernetes Health Check for a Kafka Streams Application

DZone 's Guide to

Proper Kubernetes Health Check for a Kafka Streams Application

Take Kafka for its checkup.

· Big Data Zone ·
Free Resource

stethoscope-on-bed

When you grass your cattle, you typically configure a health check to keep your herd alive. A very common livenessProbe  is about doing a GET request to an endpoint, and if the service replies with a  two hundred, then we’re fine, otherwise, the pod is destroyed and a new one is brought to live:

livenessProbe:
  httpGet:
    path: /health
    port: http


With Kafka Streams, it’s not that straightforward. A basic streams application reads data from a topic, performs transformations, and puts it back into another topic. In particular, it does not expose any information about its health.

Let’s go through a few possibilities, none of which is perfect, to pick the best-suited method for your specific situation. If you have other or better ideas, feel free to comment; I’d be more than happy to extend this post.

You may also like: Monitoring Kubernetes in Production: How To Guide (Part 1 of 5).

1. Create a Dedicated HTTP Endpoint

This sounds pretty easy. Along with your Kafka Streams app run a Java HTTP Server, which exposes a health-check endpoint to report the streams state:

import java.io.IOException;
import java.net.InetSocketAddress;

import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.streams.KafkaStreams;

class Health {

  private static final int OK = 200;
  private static final int ERROR = 500;

  private final KafkaStreams streams;
  private HttpServer server;

  Health(KafkaStreams streams) {
    this.streams = streams;
  }

  void start() {
    try {
      server = HttpServer.create(new InetSocketAddress(Config.HOST, Config.PORT), 0);
    } catch (IOException ioe) {
      throw new RuntimeException("Could not setup http server: ", ioe);
    }
    server.createContext("/health", exchange -> {
      int responseCode = streams.state().isRunning() ? OK : ERROR;
      exchange.sendResponseHeaders(responseCode, 0);
      exchange.close();
    });
    server.start();
  }

  void stop() {
    server.stop(0);
  }
}


Then, configure the streams app accordingly:

    KafkaStreams streams = new KafkaStreams(...);
    streams.start();

    Health health = new Health(streams);
    health.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      try {
        streams.close();
        log.info("Stream stopped");
        health.stop();
      } catch (Exception exc) {
        log.error("Got exception while executing shutdown hook: ", exc);
      }
    }));


This works just fine. If the stream is running, which means its state is either RUNNING or REBALANCING, the app will reply with a 200 response code and Kubernetes won’t touch the pod. In case of a failure, the pod will be re-instantiated.

The drawback of this approach is including an HTTP server in each Kafka Streams application.

2. JMX Based Health Check — 1st Attempt

The Kafka Streams application exposes metrics via JMX if started with the following params:

-Dcom.sun.management.jmxremote.port=5555 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false


When you connect with Java Monitoring & Management Console a.k.a. jconsole, you’ll get access to a couple of metrics:

Using jconsole to connect to the MBean Server exposed by the Kafka Streams application

Using jconsole to connect to the MBean Server exposed by the Kafka Streams application

No, unfortunately we’re not done, since the status of the app is not among them.

One workaround is to monitor the count metric in the kafka.streams:type=kafka-metrics-count object. If it is higher than  1.0, then I assume the stream is running:

count metric for a healthy stream

Count metric for a healthy stream

We’ve figured out that when the stream dies, the value for count is 1.0:

count metric for a dead stream

Count metric for a dead stream

How can we build a health check around this knowledge? Kubernetes allows us to run a shell command which, when finishes successfully, exits with 0 and treats that app as healthy.

To read MBeans, we can use Jmxterm, which is available for download. It can run in non-interactive mode reading a particular MBean attribute — that’s exactly our case. The command for the health check looks like this:

livenessProbe:
  exec:
    command:
    - healthcheck.sh


The healthcheck.sh script contains one command:

if [ `echo "get -s -b kafka.streams:type=kafka-metrics-count count" | java -jar jmxterm-1.0.0-uber.jar -l localhost:5555 -v silent -n` = "1.0" ] ; then exit 1; fi


This approach has drawbacks: we need to provide the jmxterm jar file, as well as the script file in the Kubernetes pod. Let’s try to get rid of the script first.

3. JMX Based Health Check — 2nd Attempt

Since we only require to know one particular metric, I’ve written a dedicated Java app, which can be downloaded from here. If the value of the count attribute is 1.0it throws an exception and finishes with a non-zero exit code.

The health check command is no longer complex and can be typed directly in the command section:

livenessProbe:
  exec:
    command:
    - java
    - -jar
    - kafka-streams-health-check-1.0.0.jar


Still, the jar file needs to be a part of the pod. If you’re not happy to do that, let’s explore another solution:

4. Man Does Not Live by Health Check Alone

It may not always be desired to kill the bad pod and start a new one if a stream dies. It depends on the failure the stream encountered. Instead of reviving the pod in an automated fashion, you may want to receive an alert, that the app is no longer running and fix the issues manually before starting the app again.

Kubernetes defines a quite common pattern called sidecar. You create a pod which consists of the main container, the Kafka Streams application, and an accompaniment, a jmx exporter application. The official helm charts shipped by Confluent follow this style.

The Kafka broker, Schema Registry, Rest Proxy, and KSQL have a jmx exporter on the side. Just take a look at this deployment descriptor configuring the prometheus-jmx-exporter container on the side of the main container running KSQL.

The benefits are that we follow a kind of a standard approach. Finally, we do have more metrics to look into.

However having a sidecar means, we need to provide additional resources and budget for the additional container— one for every Kafka Streams app.

With a sidecar in our Kafka Streams application, we may no longer need a health check. The benefit of relying just on metrics and alerts and abandon health checks is that we don’t clutter our Kafka Streams application container with additional jar files.

There are no free lunches in life! We have to configure a monitoring system and alerts notifying us, when the Kafka Streams app dies — we need to go deeper.

Adding a Sidecar Container

First, let’s configure the Kafka Streams application to be run inside a docker container. As already mentioned, we also need to pass a few com.sun.management.jmxremote JVM arguments to expose JMX MBeans. Since it’s a good idea to automate the building and publishing of a docker image, let’s use a great gradle plugin and configure the docker image and the application itself in the build.gradle file:

docker {
    javaApplication {
        if (System.getenv('GOOGLE_APPLICATION_CREDENTIALS')) {
            registryCredentials {
                url = 'https://eu.gcr.io'
                username = '_json_key'
                password = file(System.getenv('GOOGLE_APPLICATION_CREDENTIALS')).text
            }
        }
        baseImage = 'openjdk:11-jdk'
        maintainer = 'Grzegorz Kocur "grzegorz.kocur@softwaremill.com"'
        tag = 'eu.gcr.io/myproject/' + project.name.toLowerCase() + ':' + dockerVersion
        jvmArgs = [ "-Dcom.sun.management.jmxremote.port=5555",
                    "-Dcom.sun.management.jmxremote.authenticate=false",
                    "-Dcom.sun.management.jmxremote.ssl=false"]
    }
}


Please note — there is no need to set the java.rmi.server.hostname property, since, inside the pod, containers share their network namespaces and communicate with each other using localhost. This simplifies things a lot.

Now, it’s time for the jmx exporter sidecar. It needs a configuration file. A good practice is to decouple the application and its configuration. In Kubernetes, there is a dedicated object for this — the  ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: jmx-exporter-config
data:
  jmx-prometheus.yml: |+
    jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    ssl: false
    rules:
      - pattern: ".*"


It’s a very basic example — the jmx exporter will connect to the Kafka Streams application at localhost on port 5555 and read all metrics. It’s important to use the same port as in the main application configuration.

When the ConfigMap is created on Kubernetes we can consume it within a deployment by mounting it as a volume:

spec:
  template:
    spec:
      volumes:
        - name: jmx-config
          configMap:
            name: jmx-exporter-config


The last step is to add the sidecar container to the pod:

spec:
  template:
    spec:
      containers:
        - name: prometheus-jmx-exporter
          image: solsson/kafka-prometheus-jmx-exporter
          command:
            - java
            - -jar
            - jmx_prometheus_httpserver.jar
            - 5556
            - /etc/jmx-exporter/jmx-prometheus.yml
          ports:
            - 5556
              name: metrics
          volumeMounts:
            - name: jmx-config
              mountPath: /etc/jmx-exporter


and expose it using a Kubernetes Service:

apiVersion: v1
kind: Service
metadata:
  name: kafka-stream-service
  labels:
    app: kafka-streams
spec:
  type: ClusterIP
  ports:
    - port: {{ .Values.prometheus.jmx.port }}
      targetPort: metrics
      name: metrics
      protocol: TCP
  selector:
    app: kafka-streams


We recommend using Prometheus Operator within a Kubernetes instance. Then, you can configure a ServiceMonitor to add the jmx exporter as a target for the Prometheus instance automatically:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: jmx-exporter-service
spec:
  selector:
    matchLabels:
      app: kafka-streams
  endpoints:
    - port: metrics


Once Prometheus reads the metrics we can create alerts and configure an alert manager to send the required notifications using a communication channel of your choice: Slack or text messaging seems to be a good idea.

Again, Prometheus Operator simplifies things a lot. All we have to do is to create a Kubernetes object of type PrometheusRule:

apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  labels:
    app: prometheus-operator
    release: prometheus-operator
  name: kafka-streams
spec:
  groups:
    - name: kafka-streams.rules
      rules:
        - alert: 
          annotations:
            message: "Kafka stream died for {{ $labels.job }}."
          expr: sum(kafka_streams_kafka_metrics_count_count) by (job) / count(kafka_streams_kafka_metrics_count_count) by (job) == 1
          for: 5m
          labels:
            severity: critical


The expr field defines the alert trigger. In this example, it will fire if all the kafka_streams_kafka_metrics_count_count metrics for all jobs are completed.

As already said, besides alerts we have the Kafka Streams application metrics in Prometheus and we can visualize them with Grafana:

Kafka streams

Kafka streams

Have Your Cake and Eat it too

Having both, the metrics as well as a health check, we can keep the self-healing features of a Kubernetes pod and be notified if reviving fails continuously.

When the application dies because of data mismatch, we could be notified after the third attempt, that the app won’t run without our intervention.

Nuff Said

Monitoring Kafka Streams applications turned out not to be trivial. You need to decide, if viewing metrics and possibly defining alerts will satisfy your SLA requirements. In some cases a restart performed in an automated fashion is required and this is where the livenessProbe feature kicks in. Finally, mixing both approaches should provide most confidence in respect of your application’s availability and health.

If you want to take a step forward, have a look at scaling a Kafka Streams application automatically with Kubernetes.


Further Reading

Topics:
kubernetes ,kafka streams ,monitoring ,big data ,tutorial ,java server ,https

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}