DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • The Modern Era of Data Orchestration: From Data Fragmentation to Collaboration
  • Exploring Slowly Changing Dimensions in Data Warehousing
  • ETL vs. ELT
  • Emerging Data Architectures: The Future of Data Management

Trending

  • Pragmatica Aether: Let Java Be Java
  • When One MVP Is Really Four Systems: A Better Way to Plan Multi-Role Apps
  • 5 Common Security Pitfalls in Serverless Architectures
  • Why DDoS Protection Is an Architectural Decision for Developers
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Custom SCD2 Implementation Using PySpark

Custom SCD2 Implementation Using PySpark

Learn to implement Slowly Changing Dimension Type 2 (SCD2) in a data warehouse for tracking historical data, ensuring data integrity, and enabling scalability.

By 
Abhishek Trehan user avatar
Abhishek Trehan
·
Jan. 13, 25 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
4.3K Views

Join the DZone community and get the full member experience.

Join For Free

A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.

SCD2 is a dimension that stores and manages current and historical data over time in a data warehouse. The purpose of an SCD2 is to preserve the history of changes. If a customer changes their address, for example, or any other attribute, an SCD2 allows analysts to link facts back to the customer and their attributes in the state they were at the time of the fact event.

The following diagram illustrates a star schema with a Sale fact table and Customer dimension table, which is managed as an SCD2 table.

A star schema with a Sale fact table and Customer dimension table


Let’s have a deeper look at the Customer dimension table schema. You can categorize the columns into three different groups:

  • Key: customer_dim_key, also called a surrogate key, has a unique value generated automatically. It’s used as a foreign key for the sale fact table.
  • Attributes: customer_id, first_name, last_name, city, and country have a business value used in business intelligence (BI) reports.
  • SCD2 metadata: eff_start_date, eff_end_date, and is_current are designed to manage the state of the record. eff_start_date and eff_end_date contain the time interval when the record is effective.
  • Metadata: timestamp is the actual time when the customer record was generated.

Let's look into the code to see how you can build your custom SCD2 class and apply changes in a quick and efficient manner. Let us call this class as ScdTwo.

To find the changes, we will use __get_dataframe_hash() method. This method will return the combined hash of the key and data. This method will be used to detect changes between current data and new incoming data.

We have another method called _get_changes(), which will find any new records, updated records, unchanged records, and deleted records. New records will have start_at set as the current date time; updated records will have end_at time set as the current date time. Any unchanged records will remain unchanged, and no start or end dates will be updated. Any records that were deleted by the source will be marked expired with the end date as the current date time again.

Below is the detailed code for your reference:

SQL
 
from pyspark.sql.functions import *


class ScdTwo():
    def __init__(self, current_data, incoming_changes, keys, non_keys, update_timestamp, __END_AT_col="__END_AT",
                 __START_AT_col="__START_AT") -> None:
        self.current_data = current_data
        self.incoming_changes = incoming_changes
        self.keys = keys
        self.non_keys = non_keys
        self.__END_AT_col = __END_AT_col
        self.__START_AT_col = __START_AT_col
        self.hashLength = 256
        self.update_timestamp = update_timestamp

    def __get_dataframe_hash(self, df):
        hashed_df = (df.withColumn("data_hash", sha2(concat_ws('', *[(col(c)) for c in self.non_keys]), self.hashLength))
                     .withColumn("key_hash", sha2(concat_ws('', *[col(c) for c in self.keys]), self.hashLength)))
        return hashed_df.withColumn("combined_hash", concat_ws('', hashed_df.key_hash, hashed_df.data_hash))

    def _get_changes(self):
        current_data_hash = self.__get_dataframe_hash(self.current_data)
        incoming_changes_hash = self.__get_dataframe_hash(self.incoming_changes)

        current_key_hash = current_data_hash.selectExpr("key_hash")
        current_data_hash = current_data_hash.selectExpr("data_hash")
        existing_combined_hash = current_data_hash.selectExpr("combined_hash")

        new_key_hash = incoming_changes_hash.selectExpr("key_hash")
        incoming_changes_hash = incoming_changes_hash.selectExpr("data_hash")
        new_combined_hash = incoming_changes_hash.selectExpr("combined_hash")


        matched_combined_hash = new_combined_hash.subtract(new_combined_hash.subtract(existing_combined_hash))
        matched_data = current_data_hash.join(matched_combined_hash, "combined_hash", "inner")
        matched_key_hash = matched_data.selectExpr("key_hash")

        brand_new_key_hash = new_key_hash.subtract(current_key_hash)
        brand_incoming_changes = incoming_changes_hash.join(brand_new_key_hash, "key_hash", "inner")


        deleted_key_hash = current_key_hash.subtract(new_key_hash)
        records_marked_for_deletion = current_data_hash.join(deleted_key_hash, "key_hash", "inner")
        deleted_data = records_marked_for_deletion.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))

        unmatched_key_hash = current_key_hash.subtract(new_key_hash)
        unmatched_data = current_data_hash.join(unmatched_key_hash, "key_hash", "inner")

        updated_key_hash = (current_key_hash.join(new_key_hash, "key_hash", "inner")).subtract(matched_key_hash)
        updated_data = incoming_changes_hash.join(updated_key_hash, "key_hash", "inner")
        expired_data_prep = current_data_hash.join(updated_key_hash,"key_hash", "inner")
        expired_data = expired_data_prep.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))


        return (matched_data.drop("data_hash").drop("key_hash").drop("combined_hash"), brand_incoming_changes.drop("data_hash").drop("key_hash").drop("combined_hash"),
                    deleted_data.drop("data_hash").drop("key_hash").drop("combined_hash"), updated_data.drop("data_hash").drop("key_hash").drop("combined_hash"),
                    expired_data.drop("data_hash").drop("key_hash").drop("combined_hash"))


Conclusion

Implementing Slowly Changing Dimension Type 2 (SCD2) from scratch offers flexibility, customization, and a deep understanding of data management processes. While building your own SCD2 solution requires careful design and coding, it allows for tailored logic specific to business needs, including custom update strategies, historical tracking, and performance optimization.

Some key takeaways include:

  • Data integrity: Accurate historical tracking ensures reliable data analysis.
  • Customization: Business-specific rules can be integrated seamlessly.
  • Scalability: Well-designed SCD2 implementations scale with growing datasets.

However, be mindful of potential challenges such as data skew, performance bottlenecks, and maintenance overhead. Combining best practices like partitioning, indexing, and incremental processing can help overcome these hurdles.

Business intelligence Data management Data warehouse Extract, transform, load Dimension (data warehouse)

Opinions expressed by DZone contributors are their own.

Related

  • The Modern Era of Data Orchestration: From Data Fragmentation to Collaboration
  • Exploring Slowly Changing Dimensions in Data Warehousing
  • ETL vs. ELT
  • Emerging Data Architectures: The Future of Data Management

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook