DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • The Modern Era of Data Orchestration: From Data Fragmentation to Collaboration
  • Exploring Slowly Changing Dimensions in Data Warehousing
  • ETL vs. ELT
  • Emerging Data Architectures: The Future of Data Management

Trending

  • Zero Trust for AWS NLBs: Why It Matters and How to Do It
  • Failure Handling Mechanisms in Microservices and Their Importance
  • Rust and WebAssembly: Unlocking High-Performance Web Apps
  • My LLM Journey as a Software Engineer Exploring a New Domain
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Custom SCD2 Implementation Using PySpark

Custom SCD2 Implementation Using PySpark

Learn to implement Slowly Changing Dimension Type 2 (SCD2) in a data warehouse for tracking historical data, ensuring data integrity, and enabling scalability.

By 
Abhishek Trehan user avatar
Abhishek Trehan
·
Jan. 13, 25 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
2.3K Views

Join the DZone community and get the full member experience.

Join For Free

A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.

SCD2 is a dimension that stores and manages current and historical data over time in a data warehouse. The purpose of an SCD2 is to preserve the history of changes. If a customer changes their address, for example, or any other attribute, an SCD2 allows analysts to link facts back to the customer and their attributes in the state they were at the time of the fact event.

The following diagram illustrates a star schema with a Sale fact table and Customer dimension table, which is managed as an SCD2 table.

A star schema with a Sale fact table and Customer dimension table


Let’s have a deeper look at the Customer dimension table schema. You can categorize the columns into three different groups:

  • Key: customer_dim_key, also called a surrogate key, has a unique value generated automatically. It’s used as a foreign key for the sale fact table.
  • Attributes: customer_id, first_name, last_name, city, and country have a business value used in business intelligence (BI) reports.
  • SCD2 metadata: eff_start_date, eff_end_date, and is_current are designed to manage the state of the record. eff_start_date and eff_end_date contain the time interval when the record is effective.
  • Metadata: timestamp is the actual time when the customer record was generated.

Let's look into the code to see how you can build your custom SCD2 class and apply changes in a quick and efficient manner. Let us call this class as ScdTwo.

To find the changes, we will use __get_dataframe_hash() method. This method will return the combined hash of the key and data. This method will be used to detect changes between current data and new incoming data.

We have another method called _get_changes(), which will find any new records, updated records, unchanged records, and deleted records. New records will have start_at set as the current date time; updated records will have end_at time set as the current date time. Any unchanged records will remain unchanged, and no start or end dates will be updated. Any records that were deleted by the source will be marked expired with the end date as the current date time again.

Below is the detailed code for your reference:

SQL
 
from pyspark.sql.functions import *


class ScdTwo():
    def __init__(self, current_data, incoming_changes, keys, non_keys, update_timestamp, __END_AT_col="__END_AT",
                 __START_AT_col="__START_AT") -> None:
        self.current_data = current_data
        self.incoming_changes = incoming_changes
        self.keys = keys
        self.non_keys = non_keys
        self.__END_AT_col = __END_AT_col
        self.__START_AT_col = __START_AT_col
        self.hashLength = 256
        self.update_timestamp = update_timestamp

    def __get_dataframe_hash(self, df):
        hashed_df = (df.withColumn("data_hash", sha2(concat_ws('', *[(col(c)) for c in self.non_keys]), self.hashLength))
                     .withColumn("key_hash", sha2(concat_ws('', *[col(c) for c in self.keys]), self.hashLength)))
        return hashed_df.withColumn("combined_hash", concat_ws('', hashed_df.key_hash, hashed_df.data_hash))

    def _get_changes(self):
        current_data_hash = self.__get_dataframe_hash(self.current_data)
        incoming_changes_hash = self.__get_dataframe_hash(self.incoming_changes)

        current_key_hash = current_data_hash.selectExpr("key_hash")
        current_data_hash = current_data_hash.selectExpr("data_hash")
        existing_combined_hash = current_data_hash.selectExpr("combined_hash")

        new_key_hash = incoming_changes_hash.selectExpr("key_hash")
        incoming_changes_hash = incoming_changes_hash.selectExpr("data_hash")
        new_combined_hash = incoming_changes_hash.selectExpr("combined_hash")


        matched_combined_hash = new_combined_hash.subtract(new_combined_hash.subtract(existing_combined_hash))
        matched_data = current_data_hash.join(matched_combined_hash, "combined_hash", "inner")
        matched_key_hash = matched_data.selectExpr("key_hash")

        brand_new_key_hash = new_key_hash.subtract(current_key_hash)
        brand_incoming_changes = incoming_changes_hash.join(brand_new_key_hash, "key_hash", "inner")


        deleted_key_hash = current_key_hash.subtract(new_key_hash)
        records_marked_for_deletion = current_data_hash.join(deleted_key_hash, "key_hash", "inner")
        deleted_data = records_marked_for_deletion.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))

        unmatched_key_hash = current_key_hash.subtract(new_key_hash)
        unmatched_data = current_data_hash.join(unmatched_key_hash, "key_hash", "inner")

        updated_key_hash = (current_key_hash.join(new_key_hash, "key_hash", "inner")).subtract(matched_key_hash)
        updated_data = incoming_changes_hash.join(updated_key_hash, "key_hash", "inner")
        expired_data_prep = current_data_hash.join(updated_key_hash,"key_hash", "inner")
        expired_data = expired_data_prep.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))


        return (matched_data.drop("data_hash").drop("key_hash").drop("combined_hash"), brand_incoming_changes.drop("data_hash").drop("key_hash").drop("combined_hash"),
                    deleted_data.drop("data_hash").drop("key_hash").drop("combined_hash"), updated_data.drop("data_hash").drop("key_hash").drop("combined_hash"),
                    expired_data.drop("data_hash").drop("key_hash").drop("combined_hash"))


Conclusion

Implementing Slowly Changing Dimension Type 2 (SCD2) from scratch offers flexibility, customization, and a deep understanding of data management processes. While building your own SCD2 solution requires careful design and coding, it allows for tailored logic specific to business needs, including custom update strategies, historical tracking, and performance optimization.

Some key takeaways include:

  • Data integrity: Accurate historical tracking ensures reliable data analysis.
  • Customization: Business-specific rules can be integrated seamlessly.
  • Scalability: Well-designed SCD2 implementations scale with growing datasets.

However, be mindful of potential challenges such as data skew, performance bottlenecks, and maintenance overhead. Combining best practices like partitioning, indexing, and incremental processing can help overcome these hurdles.

Business intelligence Data management Data warehouse Extract, transform, load Dimension (data warehouse)

Opinions expressed by DZone contributors are their own.

Related

  • The Modern Era of Data Orchestration: From Data Fragmentation to Collaboration
  • Exploring Slowly Changing Dimensions in Data Warehousing
  • ETL vs. ELT
  • Emerging Data Architectures: The Future of Data Management

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!