DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • What Nobody Tells You About Multimodal Data Pipelines for AI Training
  • Beyond Big Data: Designing Agentic Data Pipelines for AI Workloads
  • AI-Driven DevOps for SaaS: From Reactive to Predictive Pipelines
  • Designing AI-Assisted Integration Pipelines for Enterprise SaaS

Trending

  • OpenAPI, ORM, SVG, and Lottie
  • Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
  • Who Owns the Data Stack?: How AI Is Reshaping Ownership, Architecture, and Accountability Across Teams
  • Building an Agentic Incident Resolution System for Developers
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Data Pipeline Observability: Why Your AI Model Fails in Production

Data Pipeline Observability: Why Your AI Model Fails in Production

Your machine learning model had 95% accuracy in testing, but crashes in production. The problem isn't the model, it's your data pipeline.

By 
Abhilash Rao Mesala user avatar
Abhilash Rao Mesala
·
Jun. 26, 26 · Analysis
Likes (0)
Comment
Save
Tweet
Share
125 Views

Join the DZone community and get the full member experience.

Join For Free

The 3:00 AM Incident That Changed Everything

It was a Tuesday morning when the alerts started firing. Our recommendation engine, the one that drives 30% of our revenue, had tanked. Accuracy dropped from 94% to 58% overnight. The data science team immediately blamed the model. They started tweaking hyperparameters, re-training on new data, and running diagnostics. Nothing worked. 

I got pulled into the war room at 3:00 AM. The first thing I asked wasn't "What's wrong with the model?" It was "What changed in the data pipeline?"

Turns out, everything.

A vendor had pushed a schema change upstream. A field that used to be required became optional. Null values started flowing through our pipeline. Our feature engineering code didn't handle nulls gracefully; it just propagated them downstream. By the time the data reached the model, 40% of our feature vectors were corrupted.

The model wasn't broken. The data was.

We spent six hours manually rolling back the schema change, re-running the pipeline, and restoring service. The incident report was brutal: "Lack of data validation caught a breaking change too late."
That's when I realized we needed observability in our data pipeline, not just in our models.

The Problem: Data Quality is Invisible Until It Breaks

Here's the uncomfortable truth about data pipelines: they fail silently.

Your ETL job completes successfully. Your Spark cluster finishes transformations. Your data warehouse loads without errors. Everything looks green in the monitoring dashboard. But the data itself? Garbage in, garbage out.

There are three categories of failures that break AI models in production:

  • Missing Values: A source system stops populating a field. Your pipeline doesn't validate it. The model gets NaN values it never saw during training. Predictions become random noise.
  • Schema Changes: An upstream team adds a new column, renames an existing one, or changes data types. Your pipeline doesn't expect these changes. Either it crashes, or worse, it silently maps data to the wrong columns.
  • Distribution Shifts: The statistical properties of your data change. A field that was always between 0 and 100 suddenly has values of 50,000. Your model's scaling assumptions break. Predictions become nonsensical.
None of these show up in traditional infrastructure monitoring. Your CPU is fine. Memory is fine. Network is fine. But your data is on fire.

The Solution: Observability at Every Layer

I started building a three-layer observability framework using dbt, Great Expectations, and custom validation logic. The goal was simple: catch data quality issues before they reach the model.

Layer 1: dbt Tests (The First Line of Defense)

dbt tests are your cheapest, fastest way to catch obvious data quality issues. They run after every transformation and fail the entire pipeline if something's wrong.
Here's what we implemented:
SQL
 
-- models/staging/stg_user_events.yml
version: 2

models:
  - name: stg_user_events
    columns:
      - name: user_id
        tests:
          - not_null
          - unique
      - name: event_timestamp
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "event_timestamp <= current_timestamp()"
      - name: event_value
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "event_value > 0"

These tests are simple but powerful. They catch:
  • Missing required fields (not_null)
  • Duplicate records (unique)
  • Impossible values (event_timestamp in the future)
  • Out-of-range values (negative prices)
We run these tests on every dbt run. If any test fails, the pipeline stops. No data reaches the model. No silent corruption.

The beauty of dbt tests is that they're version-controlled, documented, and part of your transformation code. When a schema change happens, you update the test, commit it, and everyone knows what changed.

Layer 2: Great Expectations (The Statistical Validator)

dbt tests catch structural issues. Great Expectations catches statistical anomalies, the subtle shifts that break models.

Here's a real scenario: our user_age column had a distribution of 18-65 for two years. Then one day, we started getting ages of 200, 500, 1000. A data entry bug upstream. dbt tests wouldn't catch this because the values are technically valid integers. But Great Expectations would.
Python
 
# great_expectations/expectations/user_events_expectations.py
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import DataContext

context = DataContext()
suite = context.create_expectation_suite(
    expectation_suite_name="user_events_suite",
    overwrite_existing=True
)

validator = context.get_validator(
    batch_request=RuntimeBatchRequest(
        datasource_name="my_spark_datasource",
        data_connector_name="default_runtime_data_connector",
        data_asset_name="user_events"
    ),
    expectation_suite_name="user_events_suite"
)

# Expect user_age to be between 18 and 120
validator.expect_column_values_to_be_between(
    column="user_age",
    min_value=18,
    max_value=120
)

# Expect event_value to have a mean between 50 and 200
validator.expect_column_mean_to_be_between(
    column="event_value",
    min_value=50,
    max_value=200
)

# Expect less than 5% missing values in critical columns
validator.expect_column_values_to_not_be_null(
    column="user_id",
    mostly=0.95
)

# Expect the distribution to match historical patterns
validator.expect_column_kl_divergence_from_list(
    column="event_type",
    partition_object={"event_type": ["click", "view", "purchase"]},
    threshold=0.1
)

validator.save_expectation_suite(discard_failed_expectations=False)

Great Expectations runs after dbt tests. It validates:
  • Value ranges (age between 18 and 120)
  • Statistical properties (mean event value between 50 and 200)
  • Null rates (less than 5% missing in critical columns)
  • Distribution shifts (event_type distribution matches historical patterns)
If Great Expectations detects an anomaly, it alerts us. We investigate before the data reaches the model.

Layer 3: Custom Validation (The Domain Expert)

dbt and Great Expectations are generic. Your domain is specific. We added custom validation logic that understands our business.
Python
 
# pipelines/validation/custom_validators.py
import pandas as pd
from datetime import datetime, timedelta

def validate_feature_engineering(df: pd.DataFrame) -> dict:
    """
    Custom validation for features before they reach the model.
    Returns a dict of validation results.
    """
    results = {}
    
    # Validate 1: Feature completeness
    # We need at least 95% of features populated
    feature_cols = [col for col in df.columns if col.startswith('feature_')]
    null_rate = df[feature_cols].isnull().sum().sum() / (len(df) * len(feature_cols))
    results['feature_completeness'] = {
        'passed': null_rate < 0.05,
        'null_rate': null_rate,
        'threshold': 0.05
    }
    
    # Validate 2: Feature scaling
    # After normalization, features should be roughly between -3 and 3 (3 sigma)
    for col in feature_cols:
        max_val = df[col].max()
        min_val = df[col].min()
        results[f'{col}_scaling'] = {
            'passed': max_val < 10 and min_val > -10,
            'max': max_val,
            'min': min_val
        }
    
    # Validate 3: Temporal consistency
    # Events should be recent (within last 30 days)
    if 'event_date' in df.columns:
        df['event_date'] = pd.to_datetime(df['event_date'])
        days_old = (datetime.now() - df['event_date'].max()).days
        results['temporal_freshness'] = {
            'passed': days_old < 30,
            'days_old': days_old,
            'threshold_days': 30
        }
    
    # Validate 4: Business logic
    # Revenue should always be positive
    if 'revenue' in df.columns:
        negative_revenue = (df['revenue'] < 0).sum()
        results['business_logic_revenue'] = {
            'passed': negative_revenue == 0,
            'negative_count': negative_revenue
        }
    
    return results

def validate_and_alert(df: pd.DataFrame, validation_results: dict) -> bool:
    """
    Check all validations and alert if any fail.
    Returns True if all pass, False otherwise.
    """
    all_passed = True
    
    for check_name, check_result in validation_results.items():
        if not check_result['passed']:
            all_passed = False
            print(f"ALERT: {check_name} failed")
            print(f"Details: {check_result}")
            # Send to monitoring system (Datadog, New Relic, etc.)
            # send_alert(check_name, check_result)
    
    return all_passed

This custom validation runs after Great Expectations. It checks:
  • Feature completeness (95% of features populated)
  • Feature scaling (normalized features in the expected range)
  • Temporal freshness (data is recent)
  • Business logic (revenue is positive)
If any check fails, we block the pipeline and alert the team.

The Real-World Gotchas We Discovered

Gotcha 1: Validation Overhead

Running dbt tests, Great Expectations, and custom validation on every pipeline run adds latency. We went from 15-minute runs to 25-minute runs. The trade-off was worth it (catching one data quality issue saved us more time than we lost), but you need to plan for it.

Gotcha 2: False Positives

Great Expectations' distribution shift detection is sensitive. Legitimate business changes (a marketing campaign causing a spike in user_age distribution) triggered false alerts. We had to tune thresholds carefully and add context to alerts.

Gotcha 3: Schema Changes Are Sneaky

A vendor added a new column to an upstream table. Our pipeline didn't break; it just ignored the new column. But the data science team expected it. We added schema validation to catch new columns and alert us.

Gotcha 4: Null Handling Varies

Python treats null as None. SQL treats it as NULL. Spark treats it as null. When data flows between systems, nulls get lost or misinterpreted. We had to standardize null handling across the entire pipeline.

The Framework: A Decision Matrix

Here's how we decide which validation layer to use:
Issue Type Caught By Example Action
Missing required field dbt tests user_id is null Fail pipeline immediately
Duplicate records dbt tests Same user_id appears twice Fail pipeline immediately
Impossible values dbt tests event_timestamp in future Fail pipeline immediately
Out-of-range values Great Expectations age > 150 Alert, investigate, fail if severe
Distribution shift Great Expectations event_value mean changes 50% Alert, investigate, continue if acceptable
Business logic violation Custom validation revenue is negative Alert, investigate, fail
Schema change Custom validation New column added upstream Alert, investigate, update tests

The Results: From Chaos to Confidence

After implementing this three-layer framework:
  • Incident reduction: We went from 2-3 data quality incidents per month to 0 in six months.
  • Time to resolution: When issues do occur, we catch them within minutes instead of hours.
  • Model stability: Model accuracy stopped fluctuating. It's now consistently 93-95%.
  • Team confidence: Data scientists trust the data. Engineers trust the pipeline.
The best part? We caught the schema change incident before it happened. Great Expectations detected the distribution shift, we investigated, found the upstream change, and coordinated with the vendor team before any data reached production.

Getting Started: The Minimal Viable Observability

You don't need to implement everything at once. Start here:
  1. Week 1: Add dbt tests for not_null and unique on critical columns.
  2. Week 1: Add dbt tests for not_null and unique on critical columns.
  3. Week 1: Add dbt tests for not_null and unique on critical columns.
  4. Week 4: Set up alerting so you're notified when validations fail.
That's it. You now have observability in your data pipeline.

Conclusion: Observability Saves Models

Your AI model isn't failing because it's bad. It's failing because the data feeding it is bad. And you won't know the data is bad until you look.

The best models in the world can't save you from garbage data. But good observability can. dbt tests, Great Expectations, and custom validation aren't fun. They don't make it into conference talks. But they'll save your production system at 3:00 AM.

Start small. Test early. Validate often.
AI Pipeline (software) data pipeline

Opinions expressed by DZone contributors are their own.

Related

  • What Nobody Tells You About Multimodal Data Pipelines for AI Training
  • Beyond Big Data: Designing Agentic Data Pipelines for AI Workloads
  • AI-Driven DevOps for SaaS: From Reactive to Predictive Pipelines
  • Designing AI-Assisted Integration Pipelines for Enterprise SaaS

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook