DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How Kafka Can Make Microservice Planet a Better Place
  • Spring Cloud Stream: A Brief Guide
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Resilient Kafka Consumers With Reactor Kafka

Trending

  • How to Configure and Customize the Go SDK for Azure Cosmos DB
  • A Modern Stack for Building Scalable Systems
  • Performance Optimization Techniques for Snowflake on AWS
  • Kubeflow: Driving Scalable and Intelligent Machine Learning Systems
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Real-Time Stream Processing With Apache Kafka Part 4: Use Case

Real-Time Stream Processing With Apache Kafka Part 4: Use Case

A final use case in our four part series.

By 
Satish Sharma user avatar
Satish Sharma
·
Aug. 02, 19 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
20.6K Views

Join the DZone community and get the full member experience.

Join For Free

In previous articles, we have gained the ground on understanding basic terminologies used in Kafka and Kafka-Streams. In this article, we set up a single node kafka cluster on our Windows machine. Now, based on the knowledge we have gained so far, let us try to build a use case.

Scenario

Consider a hypothetical fleet management company that needs a dashboard to get the insight of its day to day activities related to vehicles. Each vehicle in this fleet management company is fitted with a GPS based geolocation emitter, which emits location data containing the following information

  1. Vehicle Id: A unique id is given to each vehicle on registration with the company.

  2. Latitude and Longitude: geolocation information of vehicle.

  3. Availability: The value of this field signifies whether the vehicle is available to take a booking or not.
    Current Status (Online/Offline) denotes whether the vehicle is on duty or not.

Functional Requirements

  1. The users should be able to know (in our case via REST API) the total number of vehicles online and offline at any point of time.

  2. TBD

Solution

There are multiple ways to meet these requirements. We will try to keep this simple and follow the following steps to fulfill this need:

  1. All GPS signals will be sent to a topic.

  2. Our stream processor will read the records from this topic and perform the required grouping, aggregation, and materialization to Kafka state-stores.

  3. The REST interface will be exposed. This will read and serve the data from Kafka state-stores, which were created in the earlier step.

Assumptions

Though we can publish the calculated value to topics and build auto-refreshing dashboards using web-sockets, we can also add windowing operations for more real-time results.

For the sake of simplicity, we will be processing all the events from the beginning. We are only targeting on-demand queries of values, thus exposing simple REST endpoints.

Architecture Diagram

Real time processing with kafka

Real-time processing with Kafka


Environment

Kafka (Topics and StateStores)

  • gpslocation: A Kafka topic that receives all messages that are emitted from vehicles.

  • statusCount: A state store that will maintain the “Online” and “Offline” vehicle counts. This is a key-value store, where the key is the status of the vehicle; value will be a count of the vehicle.

  • totalCount: A state store that will maintain the count of vehicles. This is done by counting the unique vehicle Id’s from all messages.

Use this command to create a topic.

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic


Applications (Spring Boot)

Spring Boot offers rich libraries to interact with Kafka. We will try to implement the following components as Spring Boot based applications.

  • common-libs: this will contain the POJO classes to model the different objects like Vehicle, VehicleCount, VehicleLocation. This application will be added as dependency to other two applications eliminating the concerns of model mismatch.

  • vehicle-simulator: the responsibility of this application will be to simulate the behavior of GPS signals being emitted by the device fitted in each vehicle.

  • tracker-dashboard: this application will be our stream processing component. We can have a separate application for querying the state-stores of Kafka, but for the sake of simplicity, we add this functionality in this application itself.

  • vehicle-tracker: the overall project wrapping all other applications as Modules.

Show Me Some Code!

GPS event

 Below is the POJO class to model a GPS event sent by a vehicle.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class VehicleLocation implements Serializable{

private int vehicleId;//unique id for each vehicle
private boolean online;//weather vehilce is online or offline
private boolean available;// is vehilcle ready to take bookings or not
private double latitude;
private double longitude;
public VehicleLocation(boolean online, boolean available, double latitude, double longitude) {
super();
this.online = online;
this.available = available;
this.latitude = latitude;
this.longitude = longitude;
}
}


Stream Processor

Below is our stream processor to process the records from the topic, “gpslocation,” and store them in a state store, “statusCount.” The key in this state-store will be:

  1. Online: key for online vehicle count.

  2. Offline: key for offline vehicle count.

@Component
public class VehicleStatusCountProcessor {
@Autowired
private KafkaProperties kafkaProperties;//default properties

@Bean//configure key-value serdes
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
return props;
}

@Bean
public KStream<String, Long>  statusCountStreamProcessor(StreamsBuilder streamsBuilder) {
KStream<Integer, VehicleLocation> stream = streamsBuilder.stream("gpslocation",//Read from topic
Consumed.with(Serdes.Integer(), new JsonSerde<>(VehicleLocation.class)));//using Integer and JSON serde
return stream.map((k,v)-> {// transform they key as Online/Offline based on status
String online =  v.isOnline() == true ? "Online" : "Offline";
return new KeyValue<>(online, v);
})
.groupByKey(Serialized.with(//Group by the newly mapped key in previous step
      Serdes.String(), 
      new JsonSerde<>(VehicleLocation.class))     
  )
.count(Materialized.as("statusCount"))// materialize this value to state store
.toStream();
}
}



Query the Store

Below is the code for the REST interface to get the state store and query for a specific key (Online/Offline).

@RestController
public class VehicleQueryService {
@Autowired
private StreamsBuilderFactoryBean kStreamBuilderFactoryBean;

@GetMapping("/count/{status}")
public long getVehicleCoutnForStatus(@PathVariable("status") String status) {
// Get the state-store
ReadOnlyKeyValueStore<String, Long> keyValueStore= kStreamBuilderFactoryBean.getKafkaStreams()
.store("statusCount", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(status);//get the count for the key viz. Offline/Online
}
}



Response for Offline & Online vehicle count

Query the localhost URL exposed by the tracker-dashboard application for total events received as “Online”

Stream processing

Stream processing


You should be able to get Online/Offline vehicle count changing as the records are published.

In this article, we have solved a simple use case, though stream processing can be used to build full-fledged, real-time applications.

I hope this article left you with some insights about Kafka and its Stream processing capabilities
The full code available at this git location. Please feel free to share your valuable feedback, questions.

kafka Stream processing application Use case Database Spring Framework

Opinions expressed by DZone contributors are their own.

Related

  • How Kafka Can Make Microservice Planet a Better Place
  • Spring Cloud Stream: A Brief Guide
  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Resilient Kafka Consumers With Reactor Kafka

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!