DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Beyond Partitioning and Z-Order: A Deep Dive into Liquid Clustering for Unity Catalog Managed Tables
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)
  • JSON-Based Serialized LOB Pattern
  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years

Trending

  • Engineering LLMOps: Building Robust CI/CD Pipelines for LLM Applications on Google Cloud
  • Retesting Best Practices for Agile Teams: A Quick Guide to Bug Fix Verification
  • The Network Attach Problem Nobody Warns You About
  • Jakarta EE 12: Entering the Data Age of Enterprise Java
  1. DZone
  2. Data Engineering
  3. Data
  4. Reconciling Privacy Preferences Across Two Datastores With Snowflake and Airflow

Reconciling Privacy Preferences Across Two Datastores With Snowflake and Airflow

This article explains how to turn privacy preference mismatches into a real compliance control using clear matching rules, safe fixes, and full auditability.

By 
Meihui Chen user avatar
Meihui Chen
·
Apr. 22, 26 · Tutorial
Likes (0)
Comment
Save
Tweet
Share
2.0K Views

Join the DZone community and get the full member experience.

Join For Free

In one regulated system I worked on, a marketing preference was not a soft business attribute. It was a control with direct compliance consequences.

We had two datastores that both claimed to hold the current answer for customer non-solicitation preferences. One was the privacy preference system where customers submitted opt-outs. The other was a core system consumed by downstream communication workflows. Both were usually right. That was the problem.

“Usually right” is enough to create exposure. A customer opts out in one system, the update does not land in the other one in time, and a campaign still picks them up. The technical issue looks small. The operational fallout is not.

At this scale, it mattered. The population covered roughly 12 million records across several product lines and several communication channels. At peak, the reconciliation output showed hundreds of thousands of discrepancies before the cleanup cycle stabilized. The job was to drive that down, then keep it down with a repeatable control.

This article walks through the design that worked: Snowflake for deterministic state derivation and audit tables, plus Airflow for orchestration, guardrails, and export control.

 System Shape

At a high level, the flow looked like this:

  1. Ingest raw privacy events and raw core-system snapshots into append-only staging tables.
  2. Build a canonical “latest privacy state” per customer and channel.
  3. Build a matching “latest core state” per customer and channel.
  4. Compare the two using explicit precedence and timing rules.
  5. Generate a replayable remediation delta for the records that need to be fixed.
  6. Gate the export if counts spike beyond an expected range.
  7. Export the patch set and keep an audit trail for investigations.

That separation between raw history, curated state, mismatch audit, and remediation output made the pipeline easier to reason about and much safer to rerun.

Define the Mismatch First

The first thing I learned was that a mismatch cannot be defined as “two raw fields are different.”

That is too naive for preference data.

The effective state depends on more than the raw value. You need to account for recency, scope, product applicability, and identity mapping. In my case, I normalized records into the same shape before comparing them:

  • customer_key
  • channel
  • preference_state
  • effective_from_ts
  • source_system
  • updated_ts

Once both inputs are reduced to the same contract, the reconciliation logic becomes deterministic instead of interpretive. That matters when people eventually ask why a record was classified a certain way six months later. Your original draft already set up this normalized model and the focus on effective state rather than field equality.

Data Model: Keep the Raw Layer Boring 

I kept staging append-only.

That was deliberate. When upstream feeds are messy, the worst thing you can do is overwrite your first trace of what happened. I wanted raw history preserved exactly as it arrived, even if timestamps were inconsistent or events showed up late.

SQL
 
create or replace table stg_privacy_events (
    customer_key varchar,
    channel varchar,
    preference_state varchar,  -- 'OPT_IN' or 'OPT_OUT'
    event_ts timestamp_ntz,
    ingested_ts timestamp_ntz default current_timestamp(),
    source_record_id varchar
);

create or replace table stg_core_preferences (
    customer_key varchar,
    channel varchar,
    preference_state varchar,
    updated_ts timestamp_ntz,
    snapshot_date date,
    ingested_ts timestamp_ntz default current_timestamp()
);


From there, I derived the latest privacy state per customer and channel.

SQL
 
create or replace table curated_privacy_latest as
select
    customer_key,
    channel,
    preference_state,
    event_ts as updated_ts
from (
    select
        customer_key,
        channel,
        preference_state,
        event_ts,
        ingested_ts,
        row_number() over (
            partition by customer_key, channel
            order by event_ts desc, ingested_ts desc
        ) as rn
    from stg_privacy_events
)
where rn = 1;


The tie-break on ingested_ts was necessary. We saw late-arriving events and duplicate event timestamps in the dataset. It was necessary to apply a tie-break. Ordering the results on event_ts alone gave us unpredictable results. That level of detail was one of the stronger parts of your draft and should be retained.  

Build the Mismatch Report with Explicit Priority

The mismatch report is where readers are likely to disagree with your logic. Defining the order of classification clearly is necessary. I check conditions in the following order:

  1. Missing record conditions
  2. True state disagreement
  3. Staleness, where the values match but core record is behind

That order is not random, it’s intentional. We check for presence first, as there is nothing to compare against if one side is missing. State difference comes next, as it is a stronger discrepancy than staleness. We then check for staleness, but only when both records exist and the effective state is otherwise aligned.

SQL
 
create or replace table audit_preference_mismatches as
with core_latest as (
    select
        customer_key,
        channel,
        preference_state,
        updated_ts
    from stg_core_preferences
    qualify row_number() over (
        partition by customer_key, channel
        order by updated_ts desc, ingested_ts desc
    ) = 1
)
select
    coalesce(p.customer_key, c.customer_key) as customer_key,
    coalesce(p.channel, c.channel) as channel,
    p.preference_state as expected_state,
    p.updated_ts as expected_updated_ts,
    c.preference_state as core_state,
    c.updated_ts as core_updated_ts,
    case
        when p.customer_key is null then 'MISSING_IN_PRIVACY'
        when c.customer_key is null then 'MISSING_IN_CORE'
        when p.preference_state <> c.preference_state then 'STATE_DIFF'
        when p.updated_ts > c.updated_ts then 'STALE_IN_CORE'
        else 'OK'
    end as mismatch_type
from curated_privacy_latest p
full outer join core_latest c
    on p.customer_key = c.customer_key
   and p.channel = c.channel
qualify mismatch_type <> 'OK';


For product-specific exceptions, I kept the scope logic out of the main query and pushed it into a reference table.

SQL
 
create or replace table ref_channel_product_scope (
    product_code varchar,
    channel varchar,
    is_applicable boolean
);


That avoids polluting the reconciliation step with one giant unreadable query of nested conditions, which is how your original article chose to implement it. It’s a solid choice for maintainability.

Fix Forward with Replayable Patch Sets

A mismatch report by itself is not a “control” in the traditional sense of the term. It is an “observation”. What is useful is the closed loop: generating the exact updates that the downstream system needs to apply, preserving the payload that we’ve generated, and making re-runs safe.

SQL
 
create or replace table remediation_core_updates as
select
    customer_key,
    channel,
    expected_state as new_preference_state,
    expected_updated_ts as effective_ts,
    current_timestamp() as generated_ts
from audit_preference_mismatches
where mismatch_type in ('STATE_DIFF', 'STALE_IN_CORE')
  and expected_state is not null;


Then add an idempotency hash so the same patch is not sent twice unless the payload changed.

SQL
 
alter table remediation_core_updates add column update_hash varchar;

update remediation_core_updates
set update_hash = sha2(
    customer_key || '|' ||
    channel || '|' ||
    new_preference_state || '|' ||
    to_varchar(effective_ts),
    256
);


That hash did two jobs for us - it made retries safe, and it provided us with a stable key for reconciling outbound payloads against acknowledgments from the receiving system. Replayable patch-set idea and hash-based deduping are in your draft. They’re worth highlighting.

Airflow as a Control Layer

Airflow wasn’t there to run SQL on a schedule. It was the control surface around the pipeline. I wanted a run history, retry behavior, gating, and one place to stop a bad export before it got to the downstream system. 

Here’s a simplified version of the DAG:

Python
 
from datetime import datetime, timedelta

from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

DEFAULT_ARGS = {
    "owner": "data-platform",
    "depends_on_past": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=10),
}

def fetch_mismatch_count(**context):
    hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
    result = hook.get_first(
        "select count(*) from audit_preference_mismatches"
    )
    count_value = int(result[0] or 0)
    context["ti"].xcom_push(key="mismatch_count", value=count_value)

def fail_if_spike(**context):
    mismatch_count = int(context["ti"].xcom_pull(
        key="mismatch_count",
        task_ids="count_mismatches"
    ) or 0)

    if mismatch_count > 50000:
        raise AirflowException(
            f"Mismatch spike detected: {mismatch_count}"
        )

with DAG(
    dag_id="privacy_preference_reconciliation",
    start_date=datetime(2023, 1, 1),
    schedule_interval="0 6 * * *",
    catchup=False,
    max_active_runs=1,
    default_args=DEFAULT_ARGS,
) as dag:

    build_latest = SnowflakeOperator(
        task_id="build_curated_privacy_latest",
        snowflake_conn_id="snowflake_default",
        sql="sql/build_curated_privacy_latest.sql",
    )

    build_mismatches = SnowflakeOperator(
        task_id="build_mismatch_report",
        snowflake_conn_id="snowflake_default",
        sql="sql/build_audit_preference_mismatches.sql",
    )

    count_mismatches = PythonOperator(
        task_id="count_mismatches",
        python_callable=fetch_mismatch_count,
    )

    gate = PythonOperator(
        task_id="gate_on_mismatch_spike",
        python_callable=fail_if_spike,
    )

    build_remediation = SnowflakeOperator(
        task_id="build_remediation_delta",
        snowflake_conn_id="snowflake_default",
        sql="sql/build_remediation_core_updates.sql",
    )

    export_delta = SnowflakeOperator(
        task_id="export_delta_to_stage",
        snowflake_conn_id="snowflake_default",
        sql="""
        copy into @privacy_stage/core_updates/
        from (
            select
                customer_key,
                channel,
                new_preference_state,
                effective_ts,
                update_hash,
                generated_ts
            from remediation_core_updates
        )
        file_format = (type = csv field_delimiter = ',' skip_header = 1)
        overwrite = true;
        """,
    )

    build_latest >> build_mismatches >> count_mismatches >> gate
    gate >> build_remediation >> export_delta


Two details mattered a lot in practice.

One, max_active_runs=1 to not overlap when upstream ingestion lagged. Two, the gate to block large exports when counts broke away from baseline. Your original draft already says both, but if you complete count-and-gate path you’ll have that

Security and Compliance Details that Matter in Implementation

Since this pipeline touched customer preference data I confined the compliance discussion to engineering decisions instead of policy language. Three choices mattered:

  • PII minimization: the reconciliation tables used canonical keys, channel, state, and timestamps. They didn’t need extra customer attributes for mismatch detection 
  • Access control: write access to remediation tables and export stages was restricted to pipeline role. Investigators could read audit tables but not generate or alter patches 
  • Retention and auditability: raw staging history stayed queryable for investigations, while generated delta files and mismatch snapshots were retained long enough to support remediation review and downstream acknowledgement checks.

That is what turned the job into a control instead of a periodic cleanup script.

What Actually Caused the Mismatches

The biggest failures were not fancy SQL problems. They were operational.

Key drift was one. Customer identifiers varied across systems and product lines. Without a canonical customer_key, you end up manufacturing false mismatches.

Timestamps were another. One source treated “last updated” as “last processed.” Another used local time with inconsistent normalization. I standardized timestamps on ingest and still kept ingestion time available as a tie-break.

Late-arriving events also mattered. If you trust ingest time alone, you can violate customer intent. If you trust event time alone with no secondary sort, you get unstable results.

Then there was precedence. A global do-not-contact has to override a channel-level opt-in. That rule must live in the data model or reconciliation logic, not in downstream interpretation. Those failure modes are all present in your draft and they are worth keeping almost exactly because they sound like lived engineering pain rather than theory.

Outcome

After the initial backfill and remediation cycle, mismatch counts dropped from the hundreds of thousands to under a hundred. At that point, the remaining cases were usually real data issues: mapping gaps, bad upstream records, or occasional race conditions.

More important than the first cleanup was the steady-state posture after that:

  • daily reconciliation
  • spike alerts
  • replayable deltas
  • queryable audit history

That was the difference between a one-time repair and an operating control.

Final Checklist

If you are building this kind of control, start here:

  • Define mismatch as effective-state disagreement, not raw field inequality.
  • Keep raw history separate from curated latest-state tables.
  • Design for late events and timestamp ties.
  • Encode precedence rules explicitly.
  • Generate replayable remediation payloads.
  • Put a gate in front of outbound updates.
  • Preserve an audit trail that an investigator can actually query.

The first hard problem is the mismatch definition. The second is the canonical key. Once those are right, the rest becomes much more ordinary engineering.

Data (computing) Event sql Data Types

Opinions expressed by DZone contributors are their own.

Related

  • Beyond Partitioning and Z-Order: A Deep Dive into Liquid Clustering for Unity Catalog Managed Tables
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)
  • JSON-Based Serialized LOB Pattern
  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook