Custom SCD2 Implementation Using PySpark
Learn to implement Slowly Changing Dimension Type 2 (SCD2) in a data warehouse for tracking historical data, ensuring data integrity, and enabling scalability.
Join the DZone community and get the full member experience.
Join For FreeA Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.
SCD2 is a dimension that stores and manages current and historical data over time in a data warehouse. The purpose of an SCD2 is to preserve the history of changes. If a customer changes their address, for example, or any other attribute, an SCD2 allows analysts to link facts back to the customer and their attributes in the state they were at the time of the fact event.
The following diagram illustrates a star schema with a Sale
fact table and Customer
dimension table, which is managed as an SCD2 table.
Let’s have a deeper look at the Customer
dimension table schema. You can categorize the columns into three different groups:
- Key:
customer_dim_key
, also called a surrogate key, has a unique value generated automatically. It’s used as a foreign key for the sale fact table. - Attributes:
customer_id
,first_name
,last_name
,city
, andcountry
have a business value used in business intelligence (BI) reports. - SCD2 metadata:
eff_start_date
,eff_end_date
, andis_current
are designed to manage the state of the record.eff_start_date
andeff_end_date
contain the time interval when the record is effective. - Metadata:
timestamp
is the actual time when the customer record was generated.
Let's look into the code to see how you can build your custom SCD2 class and apply changes in a quick and efficient manner. Let us call this class as ScdTwo.
To find the changes, we will use __get_dataframe_hash()
method. This method will return the combined hash of the key and data. This method will be used to detect changes between current data and new incoming data.
We have another method called _get_changes()
, which will find any new records, updated records, unchanged records, and deleted records. New records will have start_at
set as the current date time; updated records will have end_at
time set as the current date time. Any unchanged records will remain unchanged, and no start or end dates will be updated. Any records that were deleted by the source will be marked expired with the end date as the current date time again.
Below is the detailed code for your reference:
from pyspark.sql.functions import *
class ScdTwo():
def __init__(self, current_data, incoming_changes, keys, non_keys, update_timestamp, __END_AT_col="__END_AT",
__START_AT_col="__START_AT") -> None:
self.current_data = current_data
self.incoming_changes = incoming_changes
self.keys = keys
self.non_keys = non_keys
self.__END_AT_col = __END_AT_col
self.__START_AT_col = __START_AT_col
self.hashLength = 256
self.update_timestamp = update_timestamp
def __get_dataframe_hash(self, df):
hashed_df = (df.withColumn("data_hash", sha2(concat_ws('', *[(col(c)) for c in self.non_keys]), self.hashLength))
.withColumn("key_hash", sha2(concat_ws('', *[col(c) for c in self.keys]), self.hashLength)))
return hashed_df.withColumn("combined_hash", concat_ws('', hashed_df.key_hash, hashed_df.data_hash))
def _get_changes(self):
current_data_hash = self.__get_dataframe_hash(self.current_data)
incoming_changes_hash = self.__get_dataframe_hash(self.incoming_changes)
current_key_hash = current_data_hash.selectExpr("key_hash")
current_data_hash = current_data_hash.selectExpr("data_hash")
existing_combined_hash = current_data_hash.selectExpr("combined_hash")
new_key_hash = incoming_changes_hash.selectExpr("key_hash")
incoming_changes_hash = incoming_changes_hash.selectExpr("data_hash")
new_combined_hash = incoming_changes_hash.selectExpr("combined_hash")
matched_combined_hash = new_combined_hash.subtract(new_combined_hash.subtract(existing_combined_hash))
matched_data = current_data_hash.join(matched_combined_hash, "combined_hash", "inner")
matched_key_hash = matched_data.selectExpr("key_hash")
brand_new_key_hash = new_key_hash.subtract(current_key_hash)
brand_incoming_changes = incoming_changes_hash.join(brand_new_key_hash, "key_hash", "inner")
deleted_key_hash = current_key_hash.subtract(new_key_hash)
records_marked_for_deletion = current_data_hash.join(deleted_key_hash, "key_hash", "inner")
deleted_data = records_marked_for_deletion.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))
unmatched_key_hash = current_key_hash.subtract(new_key_hash)
unmatched_data = current_data_hash.join(unmatched_key_hash, "key_hash", "inner")
updated_key_hash = (current_key_hash.join(new_key_hash, "key_hash", "inner")).subtract(matched_key_hash)
updated_data = incoming_changes_hash.join(updated_key_hash, "key_hash", "inner")
expired_data_prep = current_data_hash.join(updated_key_hash,"key_hash", "inner")
expired_data = expired_data_prep.withColumn(self.__END_AT_col, expr(f"cast('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))
return (matched_data.drop("data_hash").drop("key_hash").drop("combined_hash"), brand_incoming_changes.drop("data_hash").drop("key_hash").drop("combined_hash"),
deleted_data.drop("data_hash").drop("key_hash").drop("combined_hash"), updated_data.drop("data_hash").drop("key_hash").drop("combined_hash"),
expired_data.drop("data_hash").drop("key_hash").drop("combined_hash"))
Conclusion
Implementing Slowly Changing Dimension Type 2 (SCD2) from scratch offers flexibility, customization, and a deep understanding of data management processes. While building your own SCD2 solution requires careful design and coding, it allows for tailored logic specific to business needs, including custom update strategies, historical tracking, and performance optimization.
Some key takeaways include:
- Data integrity: Accurate historical tracking ensures reliable data analysis.
- Customization: Business-specific rules can be integrated seamlessly.
- Scalability: Well-designed SCD2 implementations scale with growing datasets.
However, be mindful of potential challenges such as data skew, performance bottlenecks, and maintenance overhead. Combining best practices like partitioning, indexing, and incremental processing can help overcome these hurdles.
Opinions expressed by DZone contributors are their own.
Comments