Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
Apache Kafka guide covers architecture, cloud deployment, Python data pipelines, PySpark scaling, and real-world examples.
Join the DZone community and get the full member experience.
Join For FreeIn today's highly competitive landscape, businesses must be able to gather, process, and react to data in real-time in order to survive and thrive. Whether it's detecting fraud, personalizing user experiences, or monitoring systems, near-instant data is now a need, not a nice-to-have.
However, building and running mission-critical, real-time data pipelines is challenging. The infrastructure must be fault-tolerant, infinitely scalable, and integrated with various data sources and applications. This is where leveraging Apache Kafka, Python, and cloud platforms comes in handy.
In this comprehensive guide, we will cover:
- An overview of Apache Kafka architecture
- Running Kafka clusters on the cloud
- Building real-time data pipelines with Python
- Scaling processing using PySpark
- Real-world examples like user activity tracking, IoT data pipeline, and support chat analysis
We will include plenty of code snippets, configuration examples, and links to documentation along the way for you to get hands-on experience with these incredibly useful technologies.
Let's get started!
Apache Kafka Architecture 101
Apache Kafka is a distributed, partitioned, replicated commit log for storing streams of data reliably and at scale. At its core, Kafka provides the following capabilities:
- Publish-subscribe messaging: Kafka lets you broadcast streams of data like page views, transactions, user events, etc., from producers and consume them in real-time using consumers.
- Message storage: Kafka durably persists messages on disk as they arrive and retains them for specified periods. Messages are stored and indexed by an offset indicating the position in the log.
- Fault tolerance: Data is replicated across configurable numbers of servers. If a server goes down, another can ensure continuous operations.
- Horizontal scalability: Kafka clusters can be elastically scaled by simply adding more servers. This allows for unlimited storage and processing capacity.
Kafka architecture consists of the following main components:
Topics
Messages are published to categories called topics. Each topic acts as a feed or queue of messages. A common scenario is a topic per message type or data stream. Each message in a Kafka topic has a unique identifier called an offset, which represents its position in the topic. A topic can be divided into multiple partitions, which are segments of the topic that can be stored on different brokers. Partitioning allows Kafka to scale and parallelize the data processing by distributing the load among multiple consumers.
Producers
These are applications that publish messages to Kafka topics. They connect to the Kafka cluster, serialize data (say, to JSON or Avro), assign a key, and send it to the appropriate topic.
For example, a web app can produce clickstream events, or a mobile app can produce usage stats.
Consumers
Consumers read messages from Kafka topics and process them. Processing may involve parsing data, validation, aggregation, filtering, storing to databases, etc.
Consumers connect to the Kafka cluster and subscribe to one or more topics to get feeds of messages, which they then handle as per the use case requirements.
Brokers
This is the Kafka server that receives messages from producers, assigns offsets, commits messages to storage, and serves data to consumers. Kafka clusters consist of multiple brokers for scalability and fault tolerance.
ZooKeeper
ZooKeeper handles coordination and consensus between brokers like controller election and topic configuration. It maintains cluster state and configuration info required for Kafka operations.
This covers Kafka basics. For an in-depth understanding, refer to the excellent Kafka documentation.
Now, let's look at simplifying management by running Kafka in the cloud.
Kafka in the Cloud
While Kafka is highly scalable and reliable, operating it involves significant effort related to deployment, infrastructure management, monitoring, security, failure handling, upgrades, etc.
Thankfully, Kafka is now available as a fully managed service from all major cloud providers:
Service | Description | Pricing |
---|---|---|
AWS MSK |
Fully managed, highly available Apache Kafka clusters on AWS. Handles infrastructure, scaling, security, failure handling etc. |
Based on number of brokers |
Google Cloud Pub/Sub |
Serverless, real-time messaging service based on Kafka. Auto-scaling, at least once delivery guarantees. |
Based on usage metrics |
Confluent Cloud |
Fully managed event streaming platform powered by Apache Kafka. Free tier available. |
Tiered pricing based on features |
Azure Event Hubs |
High throughput event ingestion service for Apache Kafka. Integrations with Azure data services. |
Based on throughput units |
The managed services abstract away the complexities of Kafka operations and let you focus on your data pipelines.
Next, we will build a real-time pipeline with Python, Kafka, and the cloud. You can also refer to the following guide as another example.
Building Real-Time Data Pipelines
A basic real-time pipeline with Kafka has two main components: a producer that publishes messages to Kafka and a consumer that subscribes to topics and processes the messages.
The architecture follows this flow:
We will use the Confluent Kafka Python client library for simplicity.
1. Python Producer
The producer application gathers data from sources and publishes it to Kafka topics. As an example, let's say we have a Python service collecting user clickstream events from a web application.
In a web application, when a user acts like a page view or product rating, we can capture these events and send them to Kafka.
We can abstract the implementation details of how the web app collects the data.
from confluent_kafka import Producer
import json
# User event data
event = {
"timestamp": "2022-01-01T12:22:25",
"userid": "user123",
"page": "/product123",
"action": "view"
}
# Convert to JSON
event_json = json.dumps(event)
# Kafka producer configuration
conf = {
'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'client.id': 'clickstream-producer'
}
# Create producer instance
producer = Producer(conf)
# Publish event
producer.produce(topic='clickstream', value=event_json)
# Flush and close producer
producer.flush()
producer.close()
This publishes the event to the clickstream
topic on our cloud-hosted Kafka cluster.
The confluent_kafka Python client uses an internal buffer to batch messages before sending them to Kafka. This improves efficiency compared to sending each message individually.
By default, messages are accumulated in the buffer until either:
- The buffer size limit is reached (default 32 MB).
- The flush() method is called.
When flush() is called, any messages in the buffer are immediately sent to the Kafka broker.
If we did not call flush(), and instead relied on the buffer size limit, there would be a risk of losing events in the event of a failure before the next auto-flush. Calling flush() gives us greater control to minimize potential message loss.
However, calling flush() after every production introduces additional overhead. Finding the right buffering configuration depends on our specific reliability needs and throughput requirements.
We can keep adding events as they occur to build a live stream. This gives downstream data consumers a continual feed of events.
2. Python Consumer
Next, we have a consumer application to ingest events from Kafka and process them.
For example, we may want to parse events, filter for a certain subtype, and validate schema.
from confluent_kafka import Consumer
import json
# Kafka consumer configuration
conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'group.id': 'clickstream-processor',
'auto.offset.reset': 'earliest'}
# Create consumer instance
consumer = Consumer(conf)
# Subscribe to 'clickstream' topic
consumer.subscribe(['clickstream'])
# Poll Kafka for messages infinitely
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
# Parse JSON from message value
event = json.loads(msg.value())
# Process event based on business logic
if event['action'] == 'view':
print('User viewed product page')
elif event['action'] == 'rating':
# Validate rating, insert to DB etc
pass
print(event) # Print event
# Close consumer
consumer.close()
This polls the clickstream
topic for new messages, consumes them, and takes action based on the event type - prints, updates database, etc.
For a simple pipeline, this works well. But what if we get 100x more events per second? The consumer will not be able to keep up. This is where a tool like PySpark helps scale out processing.
3. Scaling With PySpark
PySpark provides a Python API for Apache Spark, a distributed computing framework optimized for large-scale data processing.
With PySpark, we can leverage Spark's in-memory computing and parallel execution to consume Kafka streams faster.
First, we load Kafka data into a DataFrame, which can be manipulated using Spark SQL or Python.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName('clickstream-consumer') \
.getOrCreate()
# Read stream from Kafka 'clickstream'
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "clickstream") \
.load()
# Parse JSON from value
df = df.selectExpr("CAST(value AS STRING)")
df = df.select(from_json(col("value"), schema).alias("data"))
Next, we can express whatever processing logic we need using DataFrame transformations:
from pyspark.sql.functions import *
# Filter for 'page view' events
views = df.filter(col("data.action") == "view")
# Count views per page URL
counts = views.groupBy(col("data.page"))
.count()
.orderBy("count")
# Print the stream
query = counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
This applies operations like filter, aggregate, and sort on the stream in real-time, leveraging Spark's distributed runtime.
We can also parallelize consumption using multiple consumer groups and write the output sink to databases, cloud storage, etc.
This allows us to build scalable stream processing on data from Kafka.
Now that we've covered the end-to-end pipeline let's look at some real-world examples of applying it.
Real-World Use Cases
Let's explore some practical use cases where these technologies can help process huge amounts of real-time data at scale.
User Activity Tracking
Many modern web and mobile applications track user actions like page views, button clicks, transactions, etc., to gather usage analytics.
Problem
- Data volumes can scale massively with millions of active users.
- Need insights in real-time to detect issues and personalize content
- Want to store aggregate data for historical reporting
Solution
- Ingest clickstream events into Kafka topics using Python or any language.
- Process using PySpark for cleansing, aggregations, and analytics.
- Save output to databases like Cassandra for dashboards.
- Detect anomalies using Spark ML for real-time alerting.
IoT Data Pipeline
IoT sensors generate massive volumes of real-time telemetry like temperature, pressure, location, etc.
Problem
- Millions of sensor events per second
- Requires cleaning, transforming, and enriching
- Need real-time monitoring and historical storage
Solution
- Collect sensor data in Kafka topics using language SDKs.
- Use PySpark for data wrangling and joining external data.
- Feed stream into ML models for real-time predictions.
- Store aggregate data in a time series database for visualization.
Customer Support Chat Analysis
Chat platforms like Zendesk capture huge amounts of customer support conversations.
Problem
- Millions of chat messages per month
- Need to understand customer pain points and agent performance
- Must detect negative sentiment and urgent issues
Solution
- Ingest chat transcripts into Kafka topics using a connector
- Aggregate and process using PySpark SQL and DataFrames
- Feed data into NLP models to classify sentiment and intent
- Store insights into the database for historical reporting
- Present real-time dashboards for contact center ops
This demonstrates applying the technologies to real business problems involving massive, fast-moving data.
Learn More
To summarize, we looked at how Python, Kafka, and the cloud provide a great combination for building robust, scalable real-time data pipelines.
Opinions expressed by DZone contributors are their own.
Comments