Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Interactive Queries in Apache Kafka

DZone's Guide to

Interactive Queries in Apache Kafka

Interactive queries were designed to give devs access to the internal state or state stores of a stream processing app. And in this post we'll learn how to use them!

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

Apache Kafka v0.10 introduced a new feature, Kafka Streams API - a client library which can be used for building applications and microservices, where the input and output data can be stored in Kafka clusters.

Kafka Streams provides state stores, which can be used by stream processing applications to store and query data. Every task in Kafka Streams uses one or more state stores which can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. It also offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams allows read-only queries of these state stores by methods, processes or external applications. This is provided through a feature called Interactive Queries. Interactive queries were designed to give developers access to the internal state or state stores of a stream processing application (KIP-67 ).

Eliminating the Need for an External Database

With Interactive Queries and Apache Kafka, we can eliminate the need for an external database by making all the states of your application queryable through the Interactive Query API.

Kafka Streams library maintains the internal state through embedded databases (the default is RocksDB, but one can plug in their own). Now, These embedded databases act as materialized views of logs that are stored in Apache Kafka. With the help of Interactive Queries, we can directly expose this embedded state to applications.

Making a Kafka Streams Application Queryable

Kafka Streams handle most of the low-level querying, metadata discovery, and data fault tolerance. Depending on the application, one can query directly with no extra work (local state stores), or might have to implement a layer of indirection for distributed querying (remote state stores).

Querying Local State Stores

Since a Kafka Streams application typically runs on multiple instances, the state that is locally available on any given instance is only a subset of the entire state of the application. Querying the local state stores on an instance returns the data which is locally available on that particular instance.

The example below has a KStream of textLines and it further creates a key-value store named "CountsKeyValueStore" which will be responsible for holding the latest count for any word that is found on the topic "word-count-input."

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));

// Create a key-value store named "CountsKeyValueStore" for all the word counts
groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

After the application has started, you can query the "CountsKeyValueStore" using the ReadOnlyKeyValueStore API. It can be done in the following manner:

// Get the key-value store CountsKeyValueStore
ReadOnlyKeyValueStore keyValueStore =
streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());

Querying Remote State Stores

To query remote states for the entire app, you must expose the application's full state to other applications, including applications that are running on different machines.

Below are the steps needed to query remote state stores and make the full state of the stream processing application queryable:

  • Add an RPC Layer to Your Application

    • This helps the instances of your application to interact via the network.
    • The RPC layer should be embedded within the Kafka Streams application and expose an endpoint through which application instances and applications can connect to it.
  • Expose the RPC Endpoints

    • To enable remote state store discovery in a distributed Kafka Streams application, you must expose the RPC endpoint of application instances using the application.server configuration setting of Kafka Streams.
    • The value of this configuration property is unique for each instance of your application.
    • It helps in keeping track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.
  • Discover Remote Application Instances and Their State Stores

    • The RPC layer should discover remote application instances and their state stores and query locally available state stores to make the full state of your application queryable.
    • The remote application instances can forward queries to other app instances if a particular instance lacks the local data to respond to a query. The locally available state stores can directly respond to queries.

References

This article was first published on the Knoldus blog.

12 Best Practices for Modern Data Ingestion. Download White Paper.

Topics:
big data ,kafka streams ,interactive queries

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}