DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Superior Stream Processing: Apache Flink's Impact on Data Lakehouse Architecture
  • File Systems <> Database: Full Circle
  • How Trustworthy Is Big Data?
  • Enhancing Avro With Semantic Metadata Using Logical Types

Trending

  • Building Production-Safe Agentic Remediation With Docker MCP Gateway: Lessons From 43% to 100% Accuracy
  • Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
  • Deploying Infrastructure With OpenTofu
  • Building High‑Precision Vector Search for Document Retrieval on Databricks
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Dead Letter Queue Patterns in Apache Flink: Handling Poison Messages Without Stopping Your Stream

Dead Letter Queue Patterns in Apache Flink: Handling Poison Messages Without Stopping Your Stream

A poison message can trap a Flink job in a restart loop. Use side outputs, retries, tiered DLQs, durable sinks, and replay jobs to keep the stream running.

By 
Rohit Muthyala user avatar
Rohit Muthyala
·
Jul. 02, 26 · Analysis
Likes (0)
Comment
Save
Tweet
Share
93 Views

Join the DZone community and get the full member experience.

Join For Free

Streaming systems usually fail in one of two ways:

  • Loudly, when infrastructure breaks
  • Quietly, when one bad record keeps replaying until the pipeline is effectively dead

The second failure mode is more dangerous because it often starts with something small: malformed JSON, an unexpected schema change, a missing required field, or a downstream timeout that was never handled correctly.

In Apache Flink, one unhandled exception can trigger a restart. If the same poison message is still sitting in Kafka after recovery, the job reads it again, fails again, restarts again, and enters a loop. At that point, the pipeline is technically "recovering," but operationally it is down.

This is exactly why production Flink jobs need a Dead Letter Queue (DLQ) strategy from day one.

A proper DLQ pattern does three things:

  1. Isolates bad records so they do not stop good ones
  2. Captures enough failure context to debug the issue later
  3. Preserves replayability so quarantined records can be reprocessed after the root cause is fixed

Anything less is not really a DLQ. It is either silent data loss or delayed outage.

In this article, I will walk through the most practical DLQ patterns for Apache Flink 1.18:

  • Side outputs as the core DLQ primitive
  • Retry with exponential backoff for transient failures
  • Tiered DLQ routing by error class
  • Kafka and S3 sink patterns
  • Metrics and alerting
  • Replay with a dedicated reprocessing job
  • A PyFlink version of the side output pattern

The goal is simple: a bad message should never silently disappear, and it should never silently stop the stream.

Why Poison Messages Break Otherwise Healthy Pipelines

A poison message is any record that consistently fails processing.

Typical examples include:

  • Malformed JSON
  • Incompatible schema versions
  • Missing required fields
  • Invalid business values
  • Records that trigger unexpected code paths
  • Messages that repeatedly fail downstream enrichment calls

Without DLQ handling, the failure path usually looks like this:

  1. The record enters the pipeline
  2. Deserialization or validation throws an exception
  3. The operator fails
  4. Flink restarts from the last checkpoint
  5. The same record is consumed again
  6. The same exception happens again

That loop can continue indefinitely.

The result is predictable:

  • Throughput drops to zero
  • Downstream consumers starve
  • Checkpoint recovery does not help
  • On-call engineers get paged for a problem caused by one record

This is why DLQ handling is not just an error-handling convenience. It is a core reliability pattern.

What a DLQ Should Look Like in Flink

In a streaming architecture, a DLQ is a durable destination for records that could not be processed successfully.

For Flink, that means the DLQ record should usually include:

  • Raw payload
  • Error type
  • Error message
  • Stack trace or summarized failure context
  • Failure timestamp
  • Source metadata such as topic, partition, or offset when available

That information matters because a DLQ is only useful if someone can answer two questions later:

  1. Why did this record fail?
  2. How do I replay it safely once the issue is fixed?

If you only log the exception, you lose replayability. If you only store the payload, you lose debugging context. If you drop the record entirely, you lose both.

So the design target is not "catch exceptions." The design target is durable, observable, replayable failure handling.

Pattern 1: Use Side Outputs as the Core DLQ Primitive

The most natural DLQ mechanism in Flink is the side output.

A side output allows one operator to emit records to multiple streams:

  • The main stream for successful records
  • One or more side streams for failures, late data, or quarantined records

That makes it the right primitive for DLQ routing.

Define the DLQ Envelope and Output Tag

Java
 
import org.apache.flink.util.OutputTag;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public static final OutputTag<DeadLetterRecord> DLQ_TAG =
    new OutputTag<DeadLetterRecord>("dead-letter-queue") {};

public record DeadLetterRecord(
    String rawPayload,
    String errorType,
    String errorMessage,
    String stackTrace,
    long failedAtEpochMs,
    String sourceTopicPartition,
    long sourceOffset
) {}


The important point here is that the DLQ record is not just the failed payload. It is an envelope that preserves enough context for triage and replay.

Route Failures Inside a ProcessFunction

Java
 
public class EntityEventProcessor
        extends ProcessFunction<String, EntityEvent> {

    @Override
    public void processElement(
            String rawMessage,
            Context ctx,
            Collector<EntityEvent> out) {

        try {
            EntityEvent event = parseAndValidate(rawMessage);
            out.collect(event);

        } catch (JsonParseException e) {
            ctx.output(DLQ_TAG, new DeadLetterRecord(
                rawMessage,
                "JSON_PARSE_FAILURE",
                e.getMessage(),
                getStackTrace(e),
                System.currentTimeMillis(),
                ctx.element().toString(),
                -1L
            ));

        } catch (SchemaValidationException e) {
            ctx.output(DLQ_TAG, new DeadLetterRecord(
                rawMessage,
                "SCHEMA_VALIDATION_FAILURE",
                e.getMessage(),
                getStackTrace(e),
                System.currentTimeMillis(),
                ctx.element().toString(),
                -1L
            ));

        } catch (Exception e) {
            ctx.output(DLQ_TAG, new DeadLetterRecord(
                rawMessage,
                "UNKNOWN_FAILURE",
                e.getMessage(),
                getStackTrace(e),
                System.currentTimeMillis(),
                ctx.element().toString(),
                -1L
            ));
        }
    }

    private EntityEvent parseAndValidate(String raw)
            throws JsonParseException, SchemaValidationException {
        EntityEvent event = objectMapper.readValue(raw, EntityEvent.class);
        if (event.entityId() == null || event.entityId().isBlank()) {
            throw new SchemaValidationException("entityId is required");
        }
        if (event.timestamp() <= 0) {
            throw new SchemaValidationException("timestamp must be positive");
        }
        return event;
    }
}


This is the minimum viable DLQ pattern, and it already solves the most important operational problem: bad records no longer stop good ones.

Wire the Main Stream and DLQ Stream

Java
 
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> kafkaSource = env
    .fromSource(buildKafkaSource(), WatermarkStrategy.noWatermarks(),
                "entity-events-source");

SingleOutputStreamOperator<EntityEvent> processed =
    kafkaSource.process(new EntityEventProcessor());

DataStream<EntityEvent> goodEvents = processed;
DataStream<DeadLetterRecord> deadLetters =
    processed.getSideOutput(DLQ_TAG);

goodEvents.sinkTo(buildDownstreamKafkaSink());
deadLetters.sinkTo(buildDlqKafkaSink());

env.execute("Entity Resolution Pipeline");


If you do nothing else, do this. Side outputs should be the default DLQ foundation in Flink.

Pattern 2: Retry Transient Failures Before Escalating to DLQ

Not every failure belongs in the DLQ immediately.

Some failures are transient:

  • A downstream service is temporarily unavailable
  • A database call times out
  • An external API is rate-limited
  • A network dependency is briefly unstable

If you send all of those directly to the DLQ, you create noise and bury the truly bad records.

The better pattern is:

  1. Retry transient failures a limited number of times
  2. Use exponential backoff
  3. Escalate to DLQ only after retries are exhausted

Retry With KeyedProcessFunction and Timers

Java
 
public class RetryingEnrichmentProcessor
        extends KeyedProcessFunction<String, EntityEvent, EnrichedEvent> {

    private static final int MAX_RETRIES = 3;
    private static final long BASE_BACKOFF_MS = 500L;

    private transient ValueState<Integer> retryCountState;
    private transient ValueState<EntityEvent> pendingEventState;

    @Override
    public void open(Configuration parameters) {
        retryCountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("retry-count", Integer.class));
        pendingEventState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("pending-event", EntityEvent.class));
    }

    @Override
    public void processElement(
            EntityEvent event,
            Context ctx,
            Collector<EnrichedEvent> out) throws Exception {

        try {
            EnrichedEvent enriched = callEnrichmentService(event);
            retryCountState.clear();
            pendingEventState.clear();
            out.collect(enriched);

        } catch (TransientServiceException e) {
            int retries = retryCountState.value() == null
                ? 0 : retryCountState.value();

            if (retries >= MAX_RETRIES) {
                retryCountState.clear();
                pendingEventState.clear();
                ctx.output(DLQ_TAG, new DeadLetterRecord(
                    event.toString(),
                    "MAX_RETRIES_EXCEEDED",
                    "Failed after " + MAX_RETRIES + " retries: " + e.getMessage(),
                    getStackTrace(e),
                    System.currentTimeMillis(),
                    ctx.getCurrentKey(),
                    -1L
                ));
            } else {
                retryCountState.update(retries + 1);
                pendingEventState.update(event);
                long backoffMs = BASE_BACKOFF_MS * (long) Math.pow(2, retries);
                ctx.timerService().registerProcessingTimeTimer(
                    System.currentTimeMillis() + backoffMs
                );
            }

        } catch (PoisonMessageException e) {
            ctx.output(DLQ_TAG, new DeadLetterRecord(
                event.toString(),
                "POISON_MESSAGE",
                e.getMessage(),
                getStackTrace(e),
                System.currentTimeMillis(),
                ctx.getCurrentKey(),
                -1L
            ));
        }
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<EnrichedEvent> out) throws Exception {

        EntityEvent pending = pendingEventState.value();
        if (pending == null) return;

        try {
            EnrichedEvent enriched = callEnrichmentService(pending);
            retryCountState.clear();
            pendingEventState.clear();
            out.collect(enriched);

        } catch (TransientServiceException e) {
            int retries = retryCountState.value();
            if (retries >= MAX_RETRIES) {
                retryCountState.clear();
                pendingEventState.clear();
                ctx.output(DLQ_TAG, new DeadLetterRecord(
                    pending.toString(),
                    "MAX_RETRIES_EXCEEDED",
                    "Timer retry exhausted: " + e.getMessage(),
                    getStackTrace(e),
                    System.currentTimeMillis(),
                    ctx.getCurrentKey(),
                    -1L
                ));
            } else {
                retryCountState.update(retries + 1);
                long backoffMs = BASE_BACKOFF_MS * (long) Math.pow(2, retries);
                ctx.timerService().registerProcessingTimeTimer(
                    timestamp + backoffMs
                );
            }
        }
    }
}


Why This Works Especially Well in Flink

This pattern is stronger in Flink than in many other stream processors because timers and state are checkpointed.

That means:

  • Retry counters survive restarts
  • Pending events survive restarts
  • Scheduled retries resume after recovery

In other words, the retry workflow itself is fault-tolerant.

That is exactly what you want when handling transient failures in a long-running stream.

Pattern 3: Split the DLQ by Failure Type

Once a pipeline matures, a single DLQ topic usually becomes too coarse.

Schema failures, business validation failures, exhausted retries, and unknown exceptions all end up mixed together. That makes triage slower and replay harder.

A better pattern is to classify failures and route them to separate DLQ streams.

Define Failure Tiers

Java
 
public enum DlqTier {
    TRANSIENT_EXHAUSTED,
    SCHEMA_INVALID,
    BUSINESS_RULE,
    UNKNOWN
}


Route by Exception Class

Java
 
public class TieredDlqRouter
        extends ProcessFunction<String, EntityEvent> {

    @Override
    public void processElement(
            String raw, Context ctx, Collector<EntityEvent> out) {

        try {
            EntityEvent event = parse(raw);
            validate(event);
            out.collect(event);

        } catch (JsonParseException | MappingException e) {
            route(ctx, raw, DlqTier.SCHEMA_INVALID, e);

        } catch (BusinessValidationException e) {
            route(ctx, raw, DlqTier.BUSINESS_RULE, e);

        } catch (Exception e) {
            route(ctx, raw, DlqTier.UNKNOWN, e);
        }
    }

    private void route(Context ctx, String raw,
                       DlqTier tier, Exception e) {
        OutputTag<DeadLetterRecord> tag = getTierTag(tier);
        ctx.output(tag, new DeadLetterRecord(
            raw, tier.name(), e.getMessage(),
            getStackTrace(e), System.currentTimeMillis(), "", -1L
        ));
    }
}


Define One Output Tag Per Tier

Java
 
public static final OutputTag<DeadLetterRecord> DLQ_SCHEMA =
    new OutputTag<>("dlq-schema-invalid") {};
public static final OutputTag<DeadLetterRecord> DLQ_BUSINESS =
    new OutputTag<>("dlq-business-rule") {};
public static final OutputTag<DeadLetterRecord> DLQ_UNKNOWN =
    new OutputTag<>("dlq-unknown") {};


Sink Each Tier Independently

Java
 
SingleOutputStreamOperator<EntityEvent> processed =
    kafkaSource.process(new TieredDlqRouter());

processed.getSideOutput(DLQ_SCHEMA)
    .sinkTo(buildKafkaSink("dlq.schema-invalid"));

processed.getSideOutput(DLQ_BUSINESS)
    .sinkTo(buildKafkaSink("dlq.business-rule"));

processed.getSideOutput(DLQ_UNKNOWN)
    .sinkTo(buildKafkaSink("dlq.unknown"));


This makes the DLQ operationally useful instead of just technically correct.

For example:

  • Schema failures can be routed to the producer team
  • Business rule failures can feed data quality workflows
  • Unknown failures can trigger higher-severity alerting

Pattern 4: Choose DLQ Sinks Based on How You Plan To Recover

Once records are routed to a DLQ stream, they need a durable destination. In practice, the two most common choices are Kafka and object storage.

Kafka DLQ Sink

Kafka is the right choice when you want:

  • Near-real-time inspection
  • Streaming replay
  • Operational integration with existing consumers
Java
 
private static KafkaSink<DeadLetterRecord> buildDlqKafkaSink(
        String topicName) {

    return KafkaSink.<DeadLetterRecord>builder()
        .setBootstrapServers("kafka-broker:9092")
        .setRecordSerializer(
            KafkaRecordSerializationSchema.builder()
                .setTopic(topicName)
                .setValueSerializationSchema(
                    new JsonSerializationSchema<>(DeadLetterRecord.class))
                .setKeySerializationSchema(
                    record -> record.errorType().getBytes())
                .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
}


S3 DLQ Sink

Object storage is the better choice when you want:

  • Long retention
  • Low-cost quarantine
  • Batch replay with Spark or Athena
  • Partitioned storage by date or error type
Java
 
private static FileSink<DeadLetterRecord> buildS3DlqSink() {
    return FileSink
        .forRowFormat(
            new Path("s3://your-bucket/dlq/entity-resolution/"),
            new JsonRowEncoder<>(DeadLetterRecord.class)
        )
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
                .withRolloverInterval(Duration.ofMinutes(15))
                .withInactivityInterval(Duration.ofMinutes(5))
                .withMaxPartSize(MemorySize.ofMebiBytes(128))
                .build()
        )
        .withBucketAssigner(
            new DateTimeBucketAssigner<>(
                "error-type='unknown'/year=yyyy/month=MM/day=dd/hour=HH")
        )
        .build();
}


A practical production pattern is to use:

  • Kafka for short-term operational handling
  • S3 for long-term quarantine and replay

That gives you both fast response and durable history.

Pattern 5: Monitor DLQ Rate, Not Just Job Uptime

A DLQ that nobody watches is just a backlog with better branding.

Job uptime alone is not enough. A Flink job can stay green while quietly routing 10% of traffic to the DLQ.

That is still a production incident.

Add Metrics Inside the Operator

Java
 
public class MonitoredEntityEventProcessor
        extends ProcessFunction<String, EntityEvent> {

    private transient Counter dlqCounter;
    private transient Counter successCounter;
    private transient Histogram processingLatency;

    @Override
    public void open(Configuration parameters) {
        MetricGroup metrics = getRuntimeContext()
            .getMetricGroup()
            .addGroup("entity_resolution");

        dlqCounter = metrics.counter("dlq_routed_total");
        successCounter = metrics.counter("processed_success_total");
        processingLatency = metrics.histogram(
            "processing_latency_ms",
            new DescriptiveStatisticsHistogram(1000)
        );
    }

    @Override
    public void processElement(
            String raw, Context ctx, Collector<EntityEvent> out) {

        long start = System.currentTimeMillis();
        try {
            EntityEvent event = parseAndValidate(raw);
            successCounter.inc();
            out.collect(event);
        } catch (Exception e) {
            dlqCounter.inc();
            ctx.output(DLQ_TAG, buildDeadLetter(raw, e));
        } finally {
            processingLatency.update(System.currentTimeMillis() - start);
        }
    }
}


Alert on DLQ Rate

A useful alert is DLQ throughput relative to successful throughput:

YAML
 
- alert: FlinkDlqRateHigh
  expr: |
    rate(flink_entity_resolution_dlq_routed_total[5m])
    /
    rate(flink_entity_resolution_processed_success_total[5m])
    > 0.01
  for: 2m
  labels:
    severity: warning
  annotations:
    summary: "DLQ rate exceeds 1% of total throughput"
    description: "Check dlq.unknown Kafka topic for upstream schema changes"


As a rule of thumb:

  • above 1% often indicates schema drift or producer issues
  • above 5% usually indicates a broader systemic problem

The exact thresholds depend on the pipeline, but the principle does not: monitor DLQ rate as a first-class health signal.

Pattern 6: Replay With a Dedicated Reprocessing Job

A DLQ is only complete when replay is possible.

The cleanest design is a separate Flink job that reads from the DLQ topic and routes records back through the main processing logic.

Example Replay Job

Java
 
public class DlqReprocessingJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<DeadLetterRecord> dlqStream = env
            .fromSource(
                buildKafkaSource("dlq.schema-invalid"),
                WatermarkStrategy.noWatermarks(),
                "dlq-source"
            );

        DataStream<String> replayStream = dlqStream
            .filter(r -> r.failedAtEpochMs() >= START_EPOCH
                      && r.failedAtEpochMs() <= END_EPOCH)
            .map(DeadLetterRecord::rawPayload);

        SingleOutputStreamOperator<EntityEvent> reprocessed =
            replayStream.process(new EntityEventProcessor());

        reprocessed.sinkTo(buildDownstreamKafkaSink());

        reprocessed.getSideOutput(DLQ_TAG)
            .sinkTo(buildKafkaSink("dlq.permanent-quarantine"));

        env.execute("DLQ Reprocessing Job");
    }
}


Why Replay Should Be a Separate Job

Keeping replay separate from the main pipeline gives you:

  • Independent scaling
  • Independent scheduling
  • Cleaner checkpoint behavior
  • Safer operational control

It also lets you drain backlogs on your own terms:

  • Off-peak hours
  • Reduced parallelism
  • Or maximum parallelism when you need to catch up quickly

That separation keeps the main pipeline stable while still making recovery practical.

PyFlink Version: Same Pattern, Same Principle

If your team uses PyFlink, the same side output pattern applies.

Python
 
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream.output_tag import OutputTag

DLQ_TAG = OutputTag(
    "dead-letter-queue",
    Types.ROW_NAMED(
        ["raw_payload", "error_type", "error_message", "failed_at_ms"],
        [Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG()]
    )
)

class EntityEventProcessor(ProcessFunction):
    def process_element(self, value, ctx):
        try:
            event = parse_and_validate(value)
            yield event
        except Exception as e:
            from pyflink.common import Row
            yield DLQ_TAG, Row(
                raw_payload=str(value),
                error_type=type(e).__name__,
                error_message=str(e),
                failed_at_ms=int(time.time() * 1000)
            )

env = StreamExecutionEnvironment.get_execution_environment()
source_stream = env.from_source(...)

processed = source_stream.process(
    EntityEventProcessor(),
    output_type=Types.STRING()
)

good_events = processed
dead_letters = processed.get_side_output(DLQ_TAG)

good_events.sink_to(build_downstream_sink())
dead_letters.sink_to(build_dlq_sink())

env.execute("Entity Resolution Pipeline")


The syntax changes, but the design principle stays the same: good records continue, bad records are isolated and persisted.

Production Checklist

Before shipping a Flink pipeline, verify the following:

Requirement Why It Matters
Risky operators wrapped in try/catch Prevents restart loops from unhandled exceptions
DLQ output tags use explicit typing Avoids runtime serialization failures
DLQ sink is durable Failed records must survive restarts
DLQ metrics are exported Silent DLQ growth is otherwise invisible
Replay path exists and is tested A DLQ without replay is just storage
DLQ retention is long enough Teams need time to diagnose and replay
Permanent quarantine exists Prevents infinite replay loops
Alerting is based on DLQ rate Job health alone is not enough


This checklist is worth automating in code review or deployment readiness checks. DLQ handling is too important to leave to convention.

Key Takeaways

If you are building Flink pipelines in production, the safest default is:

  • Use side outputs for DLQ routing
  • Retry transient failures before escalation
  • Classify failures into separate DLQ streams
  • Sink DLQ records durably
  • Export DLQ metrics
  • Replay through a dedicated job

The core rule is simple:

A bad message should never silently disappear, and it should never silently stop the stream.

That is what turns DLQ handling from a defensive coding trick into a real reliability pattern.

Environment Notes

The examples in this article target:

  • Apache Flink 1.18
  • Java 17
  • PyFlink 1.18

A few implementation notes:

  • The retry timer pattern requires a keyed stream before KeyedProcessFunction
  • RocksDB is usually the safer state backend for larger retry state
  • HashMap state backend can work well for smaller, latency-sensitive workloads
  • AT_LEAST_ONCE is usually sufficient for DLQ sinks

Final Thoughts

Poison messages are not rare in streaming systems. They are inevitable.

The real question is whether one bad record can take down an otherwise healthy pipeline. 

With the right DLQ design in Flink, the answer becomes no.

The stream keeps moving. Good records continue. Bad records are quarantined. Alerts fire. Replay remains possible. And the pipeline stays operational while the root cause is fixed.

That is the difference between a stream that works in staging and one that survives production.

Apache Flink Database Stream (computing) Big data

Opinions expressed by DZone contributors are their own.

Related

  • Superior Stream Processing: Apache Flink's Impact on Data Lakehouse Architecture
  • File Systems <> Database: Full Circle
  • How Trustworthy Is Big Data?
  • Enhancing Avro With Semantic Metadata Using Logical Types

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook