Reconciling Privacy Preferences Across Two Datastores With Snowflake and Airflow
This article explains how to turn privacy preference mismatches into a real compliance control using clear matching rules, safe fixes, and full auditability.
Join the DZone community and get the full member experience.
Join For FreeIn one regulated system I worked on, a marketing preference was not a soft business attribute. It was a control with direct compliance consequences.
We had two datastores that both claimed to hold the current answer for customer non-solicitation preferences. One was the privacy preference system where customers submitted opt-outs. The other was a core system consumed by downstream communication workflows. Both were usually right. That was the problem.
“Usually right” is enough to create exposure. A customer opts out in one system, the update does not land in the other one in time, and a campaign still picks them up. The technical issue looks small. The operational fallout is not.
At this scale, it mattered. The population covered roughly 12 million records across several product lines and several communication channels. At peak, the reconciliation output showed hundreds of thousands of discrepancies before the cleanup cycle stabilized. The job was to drive that down, then keep it down with a repeatable control.
This article walks through the design that worked: Snowflake for deterministic state derivation and audit tables, plus Airflow for orchestration, guardrails, and export control.
System Shape
At a high level, the flow looked like this:
- Ingest raw privacy events and raw core-system snapshots into append-only staging tables.
- Build a canonical “latest privacy state” per customer and channel.
- Build a matching “latest core state” per customer and channel.
- Compare the two using explicit precedence and timing rules.
- Generate a replayable remediation delta for the records that need to be fixed.
- Gate the export if counts spike beyond an expected range.
- Export the patch set and keep an audit trail for investigations.
That separation between raw history, curated state, mismatch audit, and remediation output made the pipeline easier to reason about and much safer to rerun.
Define the Mismatch First
The first thing I learned was that a mismatch cannot be defined as “two raw fields are different.”
That is too naive for preference data.
The effective state depends on more than the raw value. You need to account for recency, scope, product applicability, and identity mapping. In my case, I normalized records into the same shape before comparing them:
customer_keychannelpreference_stateeffective_from_tssource_systemupdated_ts
Once both inputs are reduced to the same contract, the reconciliation logic becomes deterministic instead of interpretive. That matters when people eventually ask why a record was classified a certain way six months later. Your original draft already set up this normalized model and the focus on effective state rather than field equality.
Data Model: Keep the Raw Layer Boring
I kept staging append-only.
That was deliberate. When upstream feeds are messy, the worst thing you can do is overwrite your first trace of what happened. I wanted raw history preserved exactly as it arrived, even if timestamps were inconsistent or events showed up late.
create or replace table stg_privacy_events (
customer_key varchar,
channel varchar,
preference_state varchar, -- 'OPT_IN' or 'OPT_OUT'
event_ts timestamp_ntz,
ingested_ts timestamp_ntz default current_timestamp(),
source_record_id varchar
);
create or replace table stg_core_preferences (
customer_key varchar,
channel varchar,
preference_state varchar,
updated_ts timestamp_ntz,
snapshot_date date,
ingested_ts timestamp_ntz default current_timestamp()
);
From there, I derived the latest privacy state per customer and channel.
create or replace table curated_privacy_latest as
select
customer_key,
channel,
preference_state,
event_ts as updated_ts
from (
select
customer_key,
channel,
preference_state,
event_ts,
ingested_ts,
row_number() over (
partition by customer_key, channel
order by event_ts desc, ingested_ts desc
) as rn
from stg_privacy_events
)
where rn = 1;
The tie-break on ingested_ts was necessary. We saw late-arriving events and duplicate event timestamps in the dataset. It was necessary to apply a tie-break. Ordering the results on event_ts alone gave us unpredictable results. That level of detail was one of the stronger parts of your draft and should be retained.
Build the Mismatch Report with Explicit Priority
The mismatch report is where readers are likely to disagree with your logic. Defining the order of classification clearly is necessary. I check conditions in the following order:
- Missing record conditions
- True state disagreement
- Staleness, where the values match but core record is behind
That order is not random, it’s intentional. We check for presence first, as there is nothing to compare against if one side is missing. State difference comes next, as it is a stronger discrepancy than staleness. We then check for staleness, but only when both records exist and the effective state is otherwise aligned.
create or replace table audit_preference_mismatches as
with core_latest as (
select
customer_key,
channel,
preference_state,
updated_ts
from stg_core_preferences
qualify row_number() over (
partition by customer_key, channel
order by updated_ts desc, ingested_ts desc
) = 1
)
select
coalesce(p.customer_key, c.customer_key) as customer_key,
coalesce(p.channel, c.channel) as channel,
p.preference_state as expected_state,
p.updated_ts as expected_updated_ts,
c.preference_state as core_state,
c.updated_ts as core_updated_ts,
case
when p.customer_key is null then 'MISSING_IN_PRIVACY'
when c.customer_key is null then 'MISSING_IN_CORE'
when p.preference_state <> c.preference_state then 'STATE_DIFF'
when p.updated_ts > c.updated_ts then 'STALE_IN_CORE'
else 'OK'
end as mismatch_type
from curated_privacy_latest p
full outer join core_latest c
on p.customer_key = c.customer_key
and p.channel = c.channel
qualify mismatch_type <> 'OK';
For product-specific exceptions, I kept the scope logic out of the main query and pushed it into a reference table.
create or replace table ref_channel_product_scope (
product_code varchar,
channel varchar,
is_applicable boolean
);
That avoids polluting the reconciliation step with one giant unreadable query of nested conditions, which is how your original article chose to implement it. It’s a solid choice for maintainability.
Fix Forward with Replayable Patch Sets
A mismatch report by itself is not a “control” in the traditional sense of the term. It is an “observation”. What is useful is the closed loop: generating the exact updates that the downstream system needs to apply, preserving the payload that we’ve generated, and making re-runs safe.
create or replace table remediation_core_updates as
select
customer_key,
channel,
expected_state as new_preference_state,
expected_updated_ts as effective_ts,
current_timestamp() as generated_ts
from audit_preference_mismatches
where mismatch_type in ('STATE_DIFF', 'STALE_IN_CORE')
and expected_state is not null;
Then add an idempotency hash so the same patch is not sent twice unless the payload changed.
alter table remediation_core_updates add column update_hash varchar;
update remediation_core_updates
set update_hash = sha2(
customer_key || '|' ||
channel || '|' ||
new_preference_state || '|' ||
to_varchar(effective_ts),
256
);
That hash did two jobs for us - it made retries safe, and it provided us with a stable key for reconciling outbound payloads against acknowledgments from the receiving system. Replayable patch-set idea and hash-based deduping are in your draft. They’re worth highlighting.
Airflow as a Control Layer
Airflow wasn’t there to run SQL on a schedule. It was the control surface around the pipeline. I wanted a run history, retry behavior, gating, and one place to stop a bad export before it got to the downstream system.
Here’s a simplified version of the DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
DEFAULT_ARGS = {
"owner": "data-platform",
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=10),
}
def fetch_mismatch_count(**context):
hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
result = hook.get_first(
"select count(*) from audit_preference_mismatches"
)
count_value = int(result[0] or 0)
context["ti"].xcom_push(key="mismatch_count", value=count_value)
def fail_if_spike(**context):
mismatch_count = int(context["ti"].xcom_pull(
key="mismatch_count",
task_ids="count_mismatches"
) or 0)
if mismatch_count > 50000:
raise AirflowException(
f"Mismatch spike detected: {mismatch_count}"
)
with DAG(
dag_id="privacy_preference_reconciliation",
start_date=datetime(2023, 1, 1),
schedule_interval="0 6 * * *",
catchup=False,
max_active_runs=1,
default_args=DEFAULT_ARGS,
) as dag:
build_latest = SnowflakeOperator(
task_id="build_curated_privacy_latest",
snowflake_conn_id="snowflake_default",
sql="sql/build_curated_privacy_latest.sql",
)
build_mismatches = SnowflakeOperator(
task_id="build_mismatch_report",
snowflake_conn_id="snowflake_default",
sql="sql/build_audit_preference_mismatches.sql",
)
count_mismatches = PythonOperator(
task_id="count_mismatches",
python_callable=fetch_mismatch_count,
)
gate = PythonOperator(
task_id="gate_on_mismatch_spike",
python_callable=fail_if_spike,
)
build_remediation = SnowflakeOperator(
task_id="build_remediation_delta",
snowflake_conn_id="snowflake_default",
sql="sql/build_remediation_core_updates.sql",
)
export_delta = SnowflakeOperator(
task_id="export_delta_to_stage",
snowflake_conn_id="snowflake_default",
sql="""
copy into @privacy_stage/core_updates/
from (
select
customer_key,
channel,
new_preference_state,
effective_ts,
update_hash,
generated_ts
from remediation_core_updates
)
file_format = (type = csv field_delimiter = ',' skip_header = 1)
overwrite = true;
""",
)
build_latest >> build_mismatches >> count_mismatches >> gate
gate >> build_remediation >> export_delta
Two details mattered a lot in practice.
One, max_active_runs=1 to not overlap when upstream ingestion lagged. Two, the gate to block large exports when counts broke away from baseline. Your original draft already says both, but if you complete count-and-gate path you’ll have that
Security and Compliance Details that Matter in Implementation
Since this pipeline touched customer preference data I confined the compliance discussion to engineering decisions instead of policy language. Three choices mattered:
- PII minimization: the reconciliation tables used canonical keys, channel, state, and timestamps. They didn’t need extra customer attributes for mismatch detection
- Access control: write access to remediation tables and export stages was restricted to pipeline role. Investigators could read audit tables but not generate or alter patches
- Retention and auditability: raw staging history stayed queryable for investigations, while generated delta files and mismatch snapshots were retained long enough to support remediation review and downstream acknowledgement checks.
That is what turned the job into a control instead of a periodic cleanup script.
What Actually Caused the Mismatches
The biggest failures were not fancy SQL problems. They were operational.
Key drift was one. Customer identifiers varied across systems and product lines. Without a canonical customer_key, you end up manufacturing false mismatches.
Timestamps were another. One source treated “last updated” as “last processed.” Another used local time with inconsistent normalization. I standardized timestamps on ingest and still kept ingestion time available as a tie-break.
Late-arriving events also mattered. If you trust ingest time alone, you can violate customer intent. If you trust event time alone with no secondary sort, you get unstable results.
Then there was precedence. A global do-not-contact has to override a channel-level opt-in. That rule must live in the data model or reconciliation logic, not in downstream interpretation. Those failure modes are all present in your draft and they are worth keeping almost exactly because they sound like lived engineering pain rather than theory.
Outcome
After the initial backfill and remediation cycle, mismatch counts dropped from the hundreds of thousands to under a hundred. At that point, the remaining cases were usually real data issues: mapping gaps, bad upstream records, or occasional race conditions.
More important than the first cleanup was the steady-state posture after that:
- daily reconciliation
- spike alerts
- replayable deltas
- queryable audit history
That was the difference between a one-time repair and an operating control.
Final Checklist
If you are building this kind of control, start here:
- Define mismatch as effective-state disagreement, not raw field inequality.
- Keep raw history separate from curated latest-state tables.
- Design for late events and timestamp ties.
- Encode precedence rules explicitly.
- Generate replayable remediation payloads.
- Put a gate in front of outbound updates.
- Preserve an audit trail that an investigator can actually query.
The first hard problem is the mismatch definition. The second is the canonical key. Once those are right, the rest becomes much more ordinary engineering.
Opinions expressed by DZone contributors are their own.
Comments