DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • The Future of Data Lies in Transformer Models vs. Big Data Transformations
  • Automate Private Azure Databricks Unity Catalog Creation
  • Building Product to Learn AI, Part 2: Shake and Bake

Trending

  • Infrastructure as Code (IaC) Beyond the Basics
  • How Large Tech Companies Architect Resilient Systems for Millions of Users
  • Navigating Double and Triple Extortion Tactics
  • What Is Plagiarism? How to Avoid It and Cite Sources
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?

Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?

Build self-healing data pipelines on GCP using AI-powered tools like GPT-4 and Python to detect and fix errors automatically.

By 
Naresh Erukulla user avatar
Naresh Erukulla
·
Mar. 27, 25 · Analysis
Likes (0)
Comment
Save
Tweet
Share
7.3K Views

Join the DZone community and get the full member experience.

Join For Free

I'm an enthusiastic data engineer who always looks out for various challenging problems and tries to solve them with a simple POC that everyone can relate to. Recently, I have thought about an issue that most data engineers face daily. I have set alerts on all the batch and streaming data pipelines. When the errors reach a threshold limit or if the data pipeline fails, we get failure notifications immediately in the email inbox. 

Everything seemed fine until I noticed one of our critical datasets could not be loaded into BigQuery. After investigating the error logs, I found several messages with "missing required data." I felt ‘lost’ seeing these frequent raw data issues from a user-inputted file. 

Handling data inconsistencies, notably missing or formatting data issues, can cause significant problems downstream in analytics and operational workflows. There is a downstream report that relies on this ingested data. This report is extensively used for daily reporting on the key metrics of how business reflects in several areas and gives crucial data points to decide how to navigate. In this critical report, all C-level stakeholders rely on presenting metrics and discussing challenges and the go-forward plan.

I spent hours manually inspecting the source CSV files, as this vast file consists of an enormous volume of transactions sourced from another upstream application. Identifying the problematic rows and correcting them is vital. By the time I solved the issues, the deadline had passed, and the stakeholders were understandably frustrated. That day, I realized how fragile traditional data pipelines can be. They break easily, and fixing them multiple times requires manual intervention, which is time-consuming and error-prone.

Have you ever faced a similar situation? Have you spent countless hours debugging data pipelines, only to realize the root cause was a simple formatting error or missing required field? If so, you're not alone. Data engineers worldwide struggle with these challenges daily. But what if there was a way to build pipelines that could "heal" themselves? That's precisely what I set out to achieve. 

How Self-Healing Data Pipeline Works

The idea of a self-healing pipeline is simple: When errors occur during data processing, the pipeline should automatically detect, analyze, and correct them without human intervention. Traditionally, fixing these issues requires manual intervention, which is time-consuming and prone to errors.

There are several ways to idealize this, but using AI agents is the best method and a futuristic approach for data engineers to self-heal failed pipelines and auto-correct them dynamically. In this article, I will show a basic implementation of how to use LLMs like the GPT-4/DeepSeek R1 model to self-heal data pipelines by using LLM’s recommendation on failed records and applying the fix through the pipeline while it is still running. The provided solution can be scaled to large data pipelines and extended to more functionalities by using the proposed method.

Here's how I built a practical pipeline leveraging OpenAI API that uses the GPT-4 model in a cloud environment. The basic steps followed are:

  • Upload a source file into Google Cloud Storage Bucket. You can use any local or any other cloud storage if you don't have access to the GCP cloud.
  • Create the data model for ingesting the original data into the BigQuery table and error records into the Error table.
  • Read the source file from the CSV and identify the Clean dataset and invalid records (error rows) dataset from the input data.
  • Ingest the Clean dataset into BigQuery and pass the Error Rows dataset to LLM using the Prompt.
  • For each Error Row, OpenAI’s GPT API analyzes and provides an intelligent product ID assignment.
  • Google BigQuery to store and retrieve product information dynamically.
  • Python-based automation for seamless integration.

Here is the complete codebase on my GitHub.

1. Reading Input Data from Cloud Storage

The pipeline starts by reading a CSV file uploaded by clients stored in Cloud Storage. We can leverage a Cloud Function — serverless execution of pipeline steps — to trigger whenever a new file is uploaded to the bucket. The function reads the file using the google-cloud-storage library and parses it into a Pandas DataFrame for further processing. 

You can write some data quality checks before passing on to the next step, where the data gets ingested into the BigQuery table. However, data issues are dynamic in real-world scenarios, and you can't predict and write all test cases, which makes the code complex and unreadable.

In our use case, the CSV file contains fields ProductID, Price, name, saleAmount. Below is the sample file with data (also missing data in both ProductID and Price fields).

CSV file containing fields ProductID, Price, name, saleAmount


Python
 
# Read CSV from GCS
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    data = blob.download_as_text()
    df = pd.read_csv(io.StringIO(data))


2. Ingesting Data into BigQuery

Next, the pipeline attempts to load the data into BigQuery. If any rows fail due to schema mismatches, data type errors, or missing fields, they are captured and logged for further analysis. This step is essential to detect the underlying error information that will be used to identify a possible solution by OpenAI API.

Python
 
# Function to clean and preprocess data
def clean_data(df):
    avg_price = get_average_price()
    
    df["Price"] = df["Price"].fillna(avg_price)

    # Log and remove rows with issues
    error_rows = df[df["ProductID"].isna()]
    clean_df = df.dropna(subset=["ProductID"])

    return clean_df, error_rows

# Function to query BigQuery for an average price
def get_average_price():
    client = bigquery.Client()
    query = f"SELECT AVG(Price) AS avg_price FROM `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.Product_Info`"
    
    try:
        df = client.query(query).to_dataframe()
        avg_price = df["avg_price"][0]
        print(f"Fetched Average Price: {avg_price}")
        return avg_price
    except Exception as e:
        print(f"Error fetching average price: {e}")
        return None


Notice that avg_price = get_average_price() is getting it from BigQuery lookup.

After the clean dataset insert:

Clean dataset insert


3. Analyzing Errors With LLMs

Analyzing errors is the key step in the entire pipeline, and this is where the magic happens. The failed records were sent to a large language model (LLM) like GPT-4 or DeepSeek R1 for analysis. The LLM examines the errors and suggests corrections and a fixed record.

For example, suppose a date field is formatted incorrectly. In that case, the LLM might suggest the correct formatted record from a String to an Integer conversion or String to Date/Timestamp conversion and vice versa. In cases where the data is expected but found null, based on the rules enforced by our code, missing values with 'Average' or Default values are fixed to ensure data ingestion is successful.

Implementing the ChatCompletion Request With Retry Mechanism

To ensure resilience, we implement a retry mechanism using tenacity. The function sends error details to GPT and retrieves suggested fixes. In our case, the 'functions' list was created and passed to the JSON payload using the ChatCompletion Request.

Note that the 'functions' list is the list of all functions available to fix the known or possible issues using the Python functions we have created in our pipeline code. GPT analyzes the input prompt and the error message to determine if it is best to invoke a specific function listed on the 'functions' list to fix the issue. 

Accordingly, GPT’s response provides structured data indicating which function should be called. GPT does not execute the function directly. Instead, the pipeline will execute it.

Python
 
@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
def chat_completion_request(messages, functions=None, model=GPT_MODEL):
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + openai.api_key,
    }
    json_data = {"model": model, "messages": messages}
    if functions is not None:
        json_data.update({"functions": functions})
    try:
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers=headers,
            json=json_data,
        )
        return response.json()
    except Exception as e:
        print("Unable to generate ChatCompletion response")
        print(f"Exception: {e}")
        return e
# Function to send ChatCompletion request to OpenAI API
functions = [
    {
        "name": "assign_product_id",
        "description": "assigning a unique ProductID",
        "parameters": {
            "type": "object",
            "properties": {
                "ProductID": {
                    "type": "integer",
                    "description": "The product ID to assign."
                },
            }
        },
    }
]


assign_product_id is the function listed on the 'functions' list that GPT can call if required. In our case, the ProductID is missing for the last two rows in the CSV file. Hence, the GPT called the specific assign_product_id function to determine the ProductID value.

The function assign_product_id fetches the highest assigned ProductID from BigQuery and increments it for subsequent use. If it runs for the first time or no data is available in the BigQuery table, it will assign the default 99999 as ProductID.

Python
 
def assign_product_id():
    client = bigquery.Client()
    # table_ref = client.dataset(BQ_DATASET_ID).table(BQ_TABLE_ID)

    query = f"""
    Select max(ProductID) as max_id from `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}` WHERE ProductID < 99999
    """
    df = client
    try:
        df = client.query(query).to_dataframe()
    except Exception as e:
        print(f"Error fetching max ProductID: {e}")
        return None
    return df["max_id"][0] + 1 if not df.empty else 99999


4. Applying Automatic Corrections

The pipeline applies the GPT’s suggestions to the failed records and re-attempts to ingest them into BigQuery. The data is stored in the main table if the corrections are successful. If not, the irreparable records are logged into a separate error table for manual review. 

In instances where the field is required and unique, GPT can get the unique ProductID value from BigQuery and add a plus 1 to it to maintain the uniqueness. Consider cases where there were multiple error rows in the pipeline; each record is processed sequentially with the fix offered by the GPT’s response and updates the Error Record with SUGGESTED values.

In the below code, ProductID gets replaced by the value fetched from assign_product_id() BigQuery table. When there are multiple error rows, each receives a unique number by incrementing the ProductID.

Python
 
# Function to send error data to GPT-4 for analysis
def analyze_errors_with_gpt(error_rows):
    if error_rows.empty:
        return error_rows

    new_product_id = assign_product_id()

    for index, row in error_rows.iterrows():
        prompt = f"""
        Fix the following ProductID by assigning a unique ProductID from the bigquery table Product_Info:
        {row.to_json()}
        """
        chat_response = chat_completion_request(
            model=GPT_MODEL,
            messages=[{"role": "user", "content": prompt}],
            functions=functions
        )

        if chat_response is not None:
            try:
                if chat_response["choices"][0]["message"]:
                    response_content = chat_response["choices"][0]["message"]
                else:
                    print("Chat response content is None")
                    continue
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON response: {e}")
                continue

            if 'function_call' in response_content:
                if response_content['function_call']['name'] == 'assign_product_id':
                    res = json.loads(response_content['function_call']['arguments'])
                    res['product_id'] = new_product_id
                    error_rows.at[index, "ProductID"] = res['product_id']
                    new_product_id += 1  # Increment the ProductID for the next row

                    print(f"Assigned ProductID: {res['product_id']}")
                else:
                    print("Function not supported")
            else:
                chat.add_prompt('assistant', response_content['content'])
        else:
            print("ChatCompletion request failed. Retrying...")

    return error_rows


5. Ingesting the Fixed Rows into BigQuery Table

The main function reads data from Google Cloud Storage (GCS), cleans it, and loads valid data into BigQuery while fixing errors dynamically.

Python
 
# Main function to execute the pipeline
def main():
    bucket_name = "self-healing-91"
    file_name = "query_results.csv"

    # Read CSV from GCS
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    data = blob.download_as_text()
    df = pd.read_csv(io.StringIO(data))

    # Clean data and identify errors
    clean_df, error_rows = clean_data(df)

    # Load valid data into BigQuery
    load_to_bigquery(clean_df, BQ_TABLE_ID)

    # Process errors if any
    if not error_rows.empty:

        # Analyze errors with GPT-4
        error_rows = analyze_errors_with_gpt(error_rows)

        load_to_bigquery(error_rows, BQ_TABLE_ID)

        print("Fixed Errors loaded successfully into BigQuery original table.")


Post error fix data, particularly check rows from 66-68. After fetching the max value as 10000 ProductID from the BigQuery table, they're incremented by one. Also, the Price field with missing information in the error rows was replaced by Avg(Price) from the BigQuery table.


6. Logging and Monitoring

Throughout the process, errors and pipeline activities are logged using Cloud Logging. This ensures that engineers can monitor the pipeline's health and troubleshoot issues.

Tools and Technologies Used

Here are the key tools and technologies I used to build and test this pipeline:

  • Cloud storage: This is for storing input CSV files.
  • Cloud functions: For serverless execution of pipeline steps.
  • BigQuery: This is used to store cleaned data and error logs.
  • GPT-4/DeepSeek R1: For analyzing and suggesting corrections for failed records.
  • Cloud logging: This is for monitoring and troubleshooting.
  • Cloud composer: This is used to orchestrate the pipeline using Apache Airflow. 

Challenges Faced

1. LLM Integration

Integrating an LLM into the pipeline was tricky. I had to ensure that the API calls were efficient and that the LLM’s responses were accurate. Additionally, there were cost considerations, as LLM APIs can be expensive for large datasets. Just know that you must set up an API key for this service. 

For example, for OpenAI, you must visit https://platform.openai.com/ to register and generate the new API key and use it in the pipeline when sending the API call with a prompt.

LLM Integration

2. Error Handling

Designing a robust error-handling mechanism was challenging. I had to account for various errors, from schema mismatches to network issues, and ensure that the pipeline could handle them gracefully. A pipeline could face numerous problems, and all issues can't be solved dynamically. Examples are File is Empty, and BigQuery Table doesn’t exist.

3. Scalability

As the volume of data grew, I had to optimize the pipeline for scalability. This involved partitioning data in BigQuery and using Dataflow for large-scale processing.

4. Cost Management

While GCP offers powerful tools, they come at a cost. I had to carefully monitor usage and optimize the pipeline to avoid unexpected bills. OpenAI API cost is another factor that you need to monitor carefully.

Final Thoughts and Key Notes

Building a self-healing pipeline is a game-changer for data engineers. It reduces manual intervention, improves efficiency, and ensures data quality. However, it’s not a silver bullet. While self-healing pipelines can save time, they come with additional costs, such as LLM API fees and increased Cloud Function usage. It’s essential to weigh these costs against the benefits. 

If you're new to self-healing pipelines, start with a small project — experiment with LLM integration and error handling before scaling up. Regularly monitor your pipeline’s performance and costs. Use tools like Cloud Monitoring and Cloud Logging to identify bottlenecks and optimize accordingly. Finally, work closely with data scientists, analysts, and business stakeholders to understand their needs and ensure the pipeline delivers value when business requirements change.

In conclusion, self-healing pipelines represent the future of data engineering. We can build robust, efficient, intelligent pipelines that minimize downtime and maximize productivity by leveraging tools like GCP and LLMs. Please explore this approach if you've ever struggled with fragile data pipelines. The initial effort is worth the long-term benefits.

What are your thoughts on self-healing pipelines? Have you tried building one? Share your experiences in the comments below.

Cloud storage Pipeline (software) large language model Big data

Opinions expressed by DZone contributors are their own.

Related

  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • The Future of Data Lies in Transformer Models vs. Big Data Transformations
  • Automate Private Azure Databricks Unity Catalog Creation
  • Building Product to Learn AI, Part 2: Shake and Bake

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!