Dead Letter Queue Patterns in Apache Flink: Handling Poison Messages Without Stopping Your Stream
A poison message can trap a Flink job in a restart loop. Use side outputs, retries, tiered DLQs, durable sinks, and replay jobs to keep the stream running.
Join the DZone community and get the full member experience.
Join For FreeStreaming systems usually fail in one of two ways:
- Loudly, when infrastructure breaks
- Quietly, when one bad record keeps replaying until the pipeline is effectively dead
The second failure mode is more dangerous because it often starts with something small: malformed JSON, an unexpected schema change, a missing required field, or a downstream timeout that was never handled correctly.
In Apache Flink, one unhandled exception can trigger a restart. If the same poison message is still sitting in Kafka after recovery, the job reads it again, fails again, restarts again, and enters a loop. At that point, the pipeline is technically "recovering," but operationally it is down.
This is exactly why production Flink jobs need a Dead Letter Queue (DLQ) strategy from day one.
A proper DLQ pattern does three things:
- Isolates bad records so they do not stop good ones
- Captures enough failure context to debug the issue later
- Preserves replayability so quarantined records can be reprocessed after the root cause is fixed
Anything less is not really a DLQ. It is either silent data loss or delayed outage.
In this article, I will walk through the most practical DLQ patterns for Apache Flink 1.18:
- Side outputs as the core DLQ primitive
- Retry with exponential backoff for transient failures
- Tiered DLQ routing by error class
- Kafka and S3 sink patterns
- Metrics and alerting
- Replay with a dedicated reprocessing job
- A PyFlink version of the side output pattern
The goal is simple: a bad message should never silently disappear, and it should never silently stop the stream.
Why Poison Messages Break Otherwise Healthy Pipelines
A poison message is any record that consistently fails processing.
Typical examples include:
- Malformed JSON
- Incompatible schema versions
- Missing required fields
- Invalid business values
- Records that trigger unexpected code paths
- Messages that repeatedly fail downstream enrichment calls
Without DLQ handling, the failure path usually looks like this:
- The record enters the pipeline
- Deserialization or validation throws an exception
- The operator fails
- Flink restarts from the last checkpoint
- The same record is consumed again
- The same exception happens again
That loop can continue indefinitely.
The result is predictable:
- Throughput drops to zero
- Downstream consumers starve
- Checkpoint recovery does not help
- On-call engineers get paged for a problem caused by one record
This is why DLQ handling is not just an error-handling convenience. It is a core reliability pattern.
What a DLQ Should Look Like in Flink
In a streaming architecture, a DLQ is a durable destination for records that could not be processed successfully.
For Flink, that means the DLQ record should usually include:
- Raw payload
- Error type
- Error message
- Stack trace or summarized failure context
- Failure timestamp
- Source metadata such as topic, partition, or offset when available
That information matters because a DLQ is only useful if someone can answer two questions later:
- Why did this record fail?
- How do I replay it safely once the issue is fixed?
If you only log the exception, you lose replayability. If you only store the payload, you lose debugging context. If you drop the record entirely, you lose both.
So the design target is not "catch exceptions." The design target is durable, observable, replayable failure handling.
Pattern 1: Use Side Outputs as the Core DLQ Primitive
The most natural DLQ mechanism in Flink is the side output.
A side output allows one operator to emit records to multiple streams:
- The main stream for successful records
- One or more side streams for failures, late data, or quarantined records
That makes it the right primitive for DLQ routing.
Define the DLQ Envelope and Output Tag
import org.apache.flink.util.OutputTag;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public static final OutputTag<DeadLetterRecord> DLQ_TAG =
new OutputTag<DeadLetterRecord>("dead-letter-queue") {};
public record DeadLetterRecord(
String rawPayload,
String errorType,
String errorMessage,
String stackTrace,
long failedAtEpochMs,
String sourceTopicPartition,
long sourceOffset
) {}
The important point here is that the DLQ record is not just the failed payload. It is an envelope that preserves enough context for triage and replay.
Route Failures Inside a ProcessFunction
public class EntityEventProcessor
extends ProcessFunction<String, EntityEvent> {
@Override
public void processElement(
String rawMessage,
Context ctx,
Collector<EntityEvent> out) {
try {
EntityEvent event = parseAndValidate(rawMessage);
out.collect(event);
} catch (JsonParseException e) {
ctx.output(DLQ_TAG, new DeadLetterRecord(
rawMessage,
"JSON_PARSE_FAILURE",
e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.element().toString(),
-1L
));
} catch (SchemaValidationException e) {
ctx.output(DLQ_TAG, new DeadLetterRecord(
rawMessage,
"SCHEMA_VALIDATION_FAILURE",
e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.element().toString(),
-1L
));
} catch (Exception e) {
ctx.output(DLQ_TAG, new DeadLetterRecord(
rawMessage,
"UNKNOWN_FAILURE",
e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.element().toString(),
-1L
));
}
}
private EntityEvent parseAndValidate(String raw)
throws JsonParseException, SchemaValidationException {
EntityEvent event = objectMapper.readValue(raw, EntityEvent.class);
if (event.entityId() == null || event.entityId().isBlank()) {
throw new SchemaValidationException("entityId is required");
}
if (event.timestamp() <= 0) {
throw new SchemaValidationException("timestamp must be positive");
}
return event;
}
}
This is the minimum viable DLQ pattern, and it already solves the most important operational problem: bad records no longer stop good ones.
Wire the Main Stream and DLQ Stream
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kafkaSource = env
.fromSource(buildKafkaSource(), WatermarkStrategy.noWatermarks(),
"entity-events-source");
SingleOutputStreamOperator<EntityEvent> processed =
kafkaSource.process(new EntityEventProcessor());
DataStream<EntityEvent> goodEvents = processed;
DataStream<DeadLetterRecord> deadLetters =
processed.getSideOutput(DLQ_TAG);
goodEvents.sinkTo(buildDownstreamKafkaSink());
deadLetters.sinkTo(buildDlqKafkaSink());
env.execute("Entity Resolution Pipeline");
If you do nothing else, do this. Side outputs should be the default DLQ foundation in Flink.
Pattern 2: Retry Transient Failures Before Escalating to DLQ
Not every failure belongs in the DLQ immediately.
Some failures are transient:
- A downstream service is temporarily unavailable
- A database call times out
- An external API is rate-limited
- A network dependency is briefly unstable
If you send all of those directly to the DLQ, you create noise and bury the truly bad records.
The better pattern is:
- Retry transient failures a limited number of times
- Use exponential backoff
- Escalate to DLQ only after retries are exhausted
Retry With KeyedProcessFunction and Timers
public class RetryingEnrichmentProcessor
extends KeyedProcessFunction<String, EntityEvent, EnrichedEvent> {
private static final int MAX_RETRIES = 3;
private static final long BASE_BACKOFF_MS = 500L;
private transient ValueState<Integer> retryCountState;
private transient ValueState<EntityEvent> pendingEventState;
@Override
public void open(Configuration parameters) {
retryCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("retry-count", Integer.class));
pendingEventState = getRuntimeContext().getState(
new ValueStateDescriptor<>("pending-event", EntityEvent.class));
}
@Override
public void processElement(
EntityEvent event,
Context ctx,
Collector<EnrichedEvent> out) throws Exception {
try {
EnrichedEvent enriched = callEnrichmentService(event);
retryCountState.clear();
pendingEventState.clear();
out.collect(enriched);
} catch (TransientServiceException e) {
int retries = retryCountState.value() == null
? 0 : retryCountState.value();
if (retries >= MAX_RETRIES) {
retryCountState.clear();
pendingEventState.clear();
ctx.output(DLQ_TAG, new DeadLetterRecord(
event.toString(),
"MAX_RETRIES_EXCEEDED",
"Failed after " + MAX_RETRIES + " retries: " + e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.getCurrentKey(),
-1L
));
} else {
retryCountState.update(retries + 1);
pendingEventState.update(event);
long backoffMs = BASE_BACKOFF_MS * (long) Math.pow(2, retries);
ctx.timerService().registerProcessingTimeTimer(
System.currentTimeMillis() + backoffMs
);
}
} catch (PoisonMessageException e) {
ctx.output(DLQ_TAG, new DeadLetterRecord(
event.toString(),
"POISON_MESSAGE",
e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.getCurrentKey(),
-1L
));
}
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<EnrichedEvent> out) throws Exception {
EntityEvent pending = pendingEventState.value();
if (pending == null) return;
try {
EnrichedEvent enriched = callEnrichmentService(pending);
retryCountState.clear();
pendingEventState.clear();
out.collect(enriched);
} catch (TransientServiceException e) {
int retries = retryCountState.value();
if (retries >= MAX_RETRIES) {
retryCountState.clear();
pendingEventState.clear();
ctx.output(DLQ_TAG, new DeadLetterRecord(
pending.toString(),
"MAX_RETRIES_EXCEEDED",
"Timer retry exhausted: " + e.getMessage(),
getStackTrace(e),
System.currentTimeMillis(),
ctx.getCurrentKey(),
-1L
));
} else {
retryCountState.update(retries + 1);
long backoffMs = BASE_BACKOFF_MS * (long) Math.pow(2, retries);
ctx.timerService().registerProcessingTimeTimer(
timestamp + backoffMs
);
}
}
}
}
Why This Works Especially Well in Flink
This pattern is stronger in Flink than in many other stream processors because timers and state are checkpointed.
That means:
- Retry counters survive restarts
- Pending events survive restarts
- Scheduled retries resume after recovery
In other words, the retry workflow itself is fault-tolerant.
That is exactly what you want when handling transient failures in a long-running stream.
Pattern 3: Split the DLQ by Failure Type
Once a pipeline matures, a single DLQ topic usually becomes too coarse.
Schema failures, business validation failures, exhausted retries, and unknown exceptions all end up mixed together. That makes triage slower and replay harder.
A better pattern is to classify failures and route them to separate DLQ streams.
Define Failure Tiers
public enum DlqTier {
TRANSIENT_EXHAUSTED,
SCHEMA_INVALID,
BUSINESS_RULE,
UNKNOWN
}
Route by Exception Class
public class TieredDlqRouter
extends ProcessFunction<String, EntityEvent> {
@Override
public void processElement(
String raw, Context ctx, Collector<EntityEvent> out) {
try {
EntityEvent event = parse(raw);
validate(event);
out.collect(event);
} catch (JsonParseException | MappingException e) {
route(ctx, raw, DlqTier.SCHEMA_INVALID, e);
} catch (BusinessValidationException e) {
route(ctx, raw, DlqTier.BUSINESS_RULE, e);
} catch (Exception e) {
route(ctx, raw, DlqTier.UNKNOWN, e);
}
}
private void route(Context ctx, String raw,
DlqTier tier, Exception e) {
OutputTag<DeadLetterRecord> tag = getTierTag(tier);
ctx.output(tag, new DeadLetterRecord(
raw, tier.name(), e.getMessage(),
getStackTrace(e), System.currentTimeMillis(), "", -1L
));
}
}
Define One Output Tag Per Tier
public static final OutputTag<DeadLetterRecord> DLQ_SCHEMA =
new OutputTag<>("dlq-schema-invalid") {};
public static final OutputTag<DeadLetterRecord> DLQ_BUSINESS =
new OutputTag<>("dlq-business-rule") {};
public static final OutputTag<DeadLetterRecord> DLQ_UNKNOWN =
new OutputTag<>("dlq-unknown") {};
Sink Each Tier Independently
SingleOutputStreamOperator<EntityEvent> processed =
kafkaSource.process(new TieredDlqRouter());
processed.getSideOutput(DLQ_SCHEMA)
.sinkTo(buildKafkaSink("dlq.schema-invalid"));
processed.getSideOutput(DLQ_BUSINESS)
.sinkTo(buildKafkaSink("dlq.business-rule"));
processed.getSideOutput(DLQ_UNKNOWN)
.sinkTo(buildKafkaSink("dlq.unknown"));
This makes the DLQ operationally useful instead of just technically correct.
For example:
- Schema failures can be routed to the producer team
- Business rule failures can feed data quality workflows
- Unknown failures can trigger higher-severity alerting
Pattern 4: Choose DLQ Sinks Based on How You Plan To Recover
Once records are routed to a DLQ stream, they need a durable destination. In practice, the two most common choices are Kafka and object storage.
Kafka DLQ Sink
Kafka is the right choice when you want:
- Near-real-time inspection
- Streaming replay
- Operational integration with existing consumers
private static KafkaSink<DeadLetterRecord> buildDlqKafkaSink(
String topicName) {
return KafkaSink.<DeadLetterRecord>builder()
.setBootstrapServers("kafka-broker:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topicName)
.setValueSerializationSchema(
new JsonSerializationSchema<>(DeadLetterRecord.class))
.setKeySerializationSchema(
record -> record.errorType().getBytes())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}
S3 DLQ Sink
Object storage is the better choice when you want:
- Long retention
- Low-cost quarantine
- Batch replay with Spark or Athena
- Partitioned storage by date or error type
private static FileSink<DeadLetterRecord> buildS3DlqSink() {
return FileSink
.forRowFormat(
new Path("s3://your-bucket/dlq/entity-resolution/"),
new JsonRowEncoder<>(DeadLetterRecord.class)
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.build()
)
.withBucketAssigner(
new DateTimeBucketAssigner<>(
"error-type='unknown'/year=yyyy/month=MM/day=dd/hour=HH")
)
.build();
}
A practical production pattern is to use:
- Kafka for short-term operational handling
- S3 for long-term quarantine and replay
That gives you both fast response and durable history.
Pattern 5: Monitor DLQ Rate, Not Just Job Uptime
A DLQ that nobody watches is just a backlog with better branding.
Job uptime alone is not enough. A Flink job can stay green while quietly routing 10% of traffic to the DLQ.
That is still a production incident.
Add Metrics Inside the Operator
public class MonitoredEntityEventProcessor
extends ProcessFunction<String, EntityEvent> {
private transient Counter dlqCounter;
private transient Counter successCounter;
private transient Histogram processingLatency;
@Override
public void open(Configuration parameters) {
MetricGroup metrics = getRuntimeContext()
.getMetricGroup()
.addGroup("entity_resolution");
dlqCounter = metrics.counter("dlq_routed_total");
successCounter = metrics.counter("processed_success_total");
processingLatency = metrics.histogram(
"processing_latency_ms",
new DescriptiveStatisticsHistogram(1000)
);
}
@Override
public void processElement(
String raw, Context ctx, Collector<EntityEvent> out) {
long start = System.currentTimeMillis();
try {
EntityEvent event = parseAndValidate(raw);
successCounter.inc();
out.collect(event);
} catch (Exception e) {
dlqCounter.inc();
ctx.output(DLQ_TAG, buildDeadLetter(raw, e));
} finally {
processingLatency.update(System.currentTimeMillis() - start);
}
}
}
Alert on DLQ Rate
A useful alert is DLQ throughput relative to successful throughput:
- alert: FlinkDlqRateHigh
expr: |
rate(flink_entity_resolution_dlq_routed_total[5m])
/
rate(flink_entity_resolution_processed_success_total[5m])
> 0.01
for: 2m
labels:
severity: warning
annotations:
summary: "DLQ rate exceeds 1% of total throughput"
description: "Check dlq.unknown Kafka topic for upstream schema changes"
As a rule of thumb:
- above 1% often indicates schema drift or producer issues
- above 5% usually indicates a broader systemic problem
The exact thresholds depend on the pipeline, but the principle does not: monitor DLQ rate as a first-class health signal.
Pattern 6: Replay With a Dedicated Reprocessing Job
A DLQ is only complete when replay is possible.
The cleanest design is a separate Flink job that reads from the DLQ topic and routes records back through the main processing logic.
Example Replay Job
public class DlqReprocessingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DeadLetterRecord> dlqStream = env
.fromSource(
buildKafkaSource("dlq.schema-invalid"),
WatermarkStrategy.noWatermarks(),
"dlq-source"
);
DataStream<String> replayStream = dlqStream
.filter(r -> r.failedAtEpochMs() >= START_EPOCH
&& r.failedAtEpochMs() <= END_EPOCH)
.map(DeadLetterRecord::rawPayload);
SingleOutputStreamOperator<EntityEvent> reprocessed =
replayStream.process(new EntityEventProcessor());
reprocessed.sinkTo(buildDownstreamKafkaSink());
reprocessed.getSideOutput(DLQ_TAG)
.sinkTo(buildKafkaSink("dlq.permanent-quarantine"));
env.execute("DLQ Reprocessing Job");
}
}
Why Replay Should Be a Separate Job
Keeping replay separate from the main pipeline gives you:
- Independent scaling
- Independent scheduling
- Cleaner checkpoint behavior
- Safer operational control
It also lets you drain backlogs on your own terms:
- Off-peak hours
- Reduced parallelism
- Or maximum parallelism when you need to catch up quickly
That separation keeps the main pipeline stable while still making recovery practical.
PyFlink Version: Same Pattern, Same Principle
If your team uses PyFlink, the same side output pattern applies.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream.output_tag import OutputTag
DLQ_TAG = OutputTag(
"dead-letter-queue",
Types.ROW_NAMED(
["raw_payload", "error_type", "error_message", "failed_at_ms"],
[Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG()]
)
)
class EntityEventProcessor(ProcessFunction):
def process_element(self, value, ctx):
try:
event = parse_and_validate(value)
yield event
except Exception as e:
from pyflink.common import Row
yield DLQ_TAG, Row(
raw_payload=str(value),
error_type=type(e).__name__,
error_message=str(e),
failed_at_ms=int(time.time() * 1000)
)
env = StreamExecutionEnvironment.get_execution_environment()
source_stream = env.from_source(...)
processed = source_stream.process(
EntityEventProcessor(),
output_type=Types.STRING()
)
good_events = processed
dead_letters = processed.get_side_output(DLQ_TAG)
good_events.sink_to(build_downstream_sink())
dead_letters.sink_to(build_dlq_sink())
env.execute("Entity Resolution Pipeline")
The syntax changes, but the design principle stays the same: good records continue, bad records are isolated and persisted.
Production Checklist
Before shipping a Flink pipeline, verify the following:
| Requirement | Why It Matters |
|---|---|
| Risky operators wrapped in try/catch | Prevents restart loops from unhandled exceptions |
| DLQ output tags use explicit typing | Avoids runtime serialization failures |
| DLQ sink is durable | Failed records must survive restarts |
| DLQ metrics are exported | Silent DLQ growth is otherwise invisible |
| Replay path exists and is tested | A DLQ without replay is just storage |
| DLQ retention is long enough | Teams need time to diagnose and replay |
| Permanent quarantine exists | Prevents infinite replay loops |
| Alerting is based on DLQ rate | Job health alone is not enough |
This checklist is worth automating in code review or deployment readiness checks. DLQ handling is too important to leave to convention.
Key Takeaways
If you are building Flink pipelines in production, the safest default is:
- Use side outputs for DLQ routing
- Retry transient failures before escalation
- Classify failures into separate DLQ streams
- Sink DLQ records durably
- Export DLQ metrics
- Replay through a dedicated job
The core rule is simple:
A bad message should never silently disappear, and it should never silently stop the stream.
That is what turns DLQ handling from a defensive coding trick into a real reliability pattern.
Environment Notes
The examples in this article target:
- Apache Flink 1.18
- Java 17
- PyFlink 1.18
A few implementation notes:
- The retry timer pattern requires a keyed stream before
KeyedProcessFunction - RocksDB is usually the safer state backend for larger retry state
- HashMap state backend can work well for smaller, latency-sensitive workloads
AT_LEAST_ONCEis usually sufficient for DLQ sinks
Final Thoughts
Poison messages are not rare in streaming systems. They are inevitable.
The real question is whether one bad record can take down an otherwise healthy pipeline.
With the right DLQ design in Flink, the answer becomes no.
The stream keeps moving. Good records continue. Bad records are quarantined. Alerts fire. Replay remains possible. And the pipeline stays operational while the root cause is fixed.
That is the difference between a stream that works in staging and one that survives production.
Opinions expressed by DZone contributors are their own.
Comments