Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure
The streaming patterns that survive in the enterprise are those built for scale, failure recovery, and long-term operability.
Join the DZone community and get the full member experience.
Join For FreeI've been running Kafka and Spark Structured Streaming together in production for about five years. Not in demo environments or proof-of-concept projects. In systems processing insurance claims, manufacturing telemetry, and financial transaction data, with SLAs and compliance requirements, and people who call you at 2 AM when things break.
There's a version of Kafka plus Spark Structured Streaming that looks elegant in architecture diagrams and falls apart in the first month of production. There's another version that's uglier in places but genuinely reliable. Here is what I've learned about the difference.
Getting Checkpointing Right From the Start
In my experience, checkpointing is non-negotiable for any streaming job that needs recovery. But checkpointing to local disk, which is the easiest configuration, means your streaming job can't recover from a node failure, only from a process restart. Checkpoint location must be on durable shared storage, ADLS Gen2 or equivalent, from the first day in production.
The checkpoint contains the Kafka offsets that have been committed and the state store for stateful operations. Changing either of these, whether by manually deleting the checkpoint or by changing the query name, will reset your consumer offsets. I've seen this happen accidentally twice: once when an engineer thought deleting a stale checkpoint directory was a cleanup operation, and once when a code refactoring changed the query name used as the checkpoint key. Both required manual offset reconstruction from Kafka's own offset storage. Neither was catastrophic, but both were stressful and avoidable.
Micro-Batch Sizing for Your Use Case
Spark Structured Streaming processes data in micro-batches. The trigger interval controls how often a micro-batch runs. The default, if you don't specify a trigger, is to run a new batch immediately after the previous one completes. This is correct for high-throughput workloads where you want to process data as fast as possible. It's wrong for moderate-throughput workloads where you want predictable latency and manageable file sizes in your output Delta Lake tables.
For our manufacturing telemetry pipeline (moderate throughput, near-real-time requirement), we use a 30-second trigger. This produces files of roughly 50 to 100MB in the output Delta table, which is manageable with a nightly compaction job. For our insurance claims pipeline (lower throughput, 5-minute SLA), we use a 2-minute trigger.
My rule of thumb: choose a trigger interval that produces output files in the 50 to 500MB range for your throughput. Files significantly smaller than this create compaction debt. Files significantly larger than this create memory pressure during the micro-batch.
# Trigger interval examples for different workloads
# High throughput: process as fast as possible
high_throughput_query = df.writeStream \
.trigger(availableNow=True) # Spark 3.3+: process all, then stop
# Moderate throughput (manufacturing telemetry): 30-second batches
telemetry_query = df.writeStream \
.trigger(processingTime="30 seconds") \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.start(output_path)
# Low throughput (insurance claims): 2-minute batches
claims_query = df.writeStream \
.trigger(processingTime="2 minutes") \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.start(output_path)
Kafka Partition Count and Spark Parallelism
Each Kafka partition is consumed by one Spark task per micro-batch. If your topic has 8 partitions, Spark uses 8 tasks for the Kafka read stage. If your downstream processing is more CPU-intensive than the Kafka read, you'll want more parallelism downstream. Use repartition() after the Kafka source read to increase parallelism for the heavy processing stages.
In the other direction: if your Kafka topic has 200 partitions because it was sized for high throughput, but your Spark cluster has 32 cores, you're trying to run 200 tasks across 32 cores with significant context switching overhead. Consider whether the partition count on the topic is appropriate for your actual throughput.
Stateful Operations and Watermarks
Windowed aggregations and stream-stream joins require Spark to maintain state across micro-batches. Without a watermark, Spark will accumulate state indefinitely, and your executor memory will grow without bound until the job fails. Always define a watermark on your event-time column for any stateful operation.
The watermark threshold is a business decision as much as a technical one. A 10-minute watermark means Spark will discard events that arrive more than 10 minutes after the event time they are associated with. If your source systems can deliver events up to 30 minutes late (common in some IoT and batch-settlement scenarios), a 10-minute watermark will cause late events to be silently dropped. Understand your source latency characteristics before setting the watermark.
# Watermark definition for late-arriving events
from pyspark.sql.functions import from_json, col, window
claims_parsed = kafka_df \
.select(from_json(col("value").cast("string"), claims_schema)
.alias("data")) \
.select("data.*") \
.withWatermark("event_timestamp", "30 minutes")
# Windowed aggregation with watermark
claims_hourly = claims_parsed \
.groupBy(
window("event_timestamp", "1 hour"),
"claim_type",
"region"
) \
.agg(
count("claim_id").alias("claim_count"),
sum("claim_amount").alias("total_amount"),
avg("claim_amount").alias("avg_amount")
)
The Monitoring You Need on Day One
First up: consumer lag per partition. This is the most important streaming metric. Growing lag means your consumer can't keep up with producer throughput, and your latency SLA is in jeopardy.
Second: micro-batch duration. If micro-batch duration exceeds your trigger interval, you have a processing bottleneck. The job is trying to run continuously without keeping up.
Third: state store size for stateful operations. A growing state store is a memory leak waiting to become an OOM failure.
My team emits these three metrics from every streaming job to Azure Monitor. When any of them crosses a threshold, we get an alert before users notice a problem. Setting this up properly at deployment time, not after the first production incident, has saved us from several avoidable outages.
# Azure Monitor metrics emission from Spark streaming
from pyspark.sql.streaming import StreamingQueryListener
from opencensus.ext.azure import metrics_exporter
class StreamingMetricsListener(StreamingQueryListener):
def __init__(self, app_insights_key):
self.exporter = metrics_exporter.new_metrics_exporter(
connection_string=f"InstrumentationKey={app_insights_key}")
def onQueryProgress(self, event):
p = event.progress
self.emit("consumer_lag", p.sources[0].endOffset
- p.sources[0].startOffset)
self.emit("batch_duration_ms", p.batchDuration)
self.emit("state_store_rows", p.stateOperators[0].numRowsTotal
if p.stateOperators else 0)
def emit(self, name, value):
# Send to Azure Monitor / Application Insights
self.exporter.export_metrics([{
"name": f"streaming.{name}",
"value": value,
"timestamp": datetime.utcnow()
}])
spark.streams.addListener(StreamingMetricsListener(AI_KEY))
Opinions expressed by DZone contributors are their own.
Comments