DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • AWS S3 Strategies for Scalable and Secure Data Lake Storage
  • The Future of Data Lakehouses: Apache Iceberg Explained
  • Relational DB Migration to S3 Data Lake Via AWS DMS, Part I
  • Harnessing the Power of AWS Aurora for Scalable and Reliable Databases

Trending

  • Implementing API Design First in .NET for Efficient Development, Testing, and CI/CD
  • Understanding the Shift: Why Companies Are Migrating From MongoDB to Aerospike Database?
  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  • Analyzing Techniques to Provision Access via IDAM Models During Emergency and Disaster Response
  1. DZone
  2. Data Engineering
  3. Databases
  4. AWS Serverless Data Lake: Built Real-time Using Apache Hudi, AWS Glue, and Kinesis Stream

AWS Serverless Data Lake: Built Real-time Using Apache Hudi, AWS Glue, and Kinesis Stream

In an enterprise system, populating a data lake relies heavily on interdependent batch processes. Today’s business demands high-quality data in minutes or seconds.

By 
Gaurav Gupta user avatar
Gaurav Gupta
·
May. 29, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
12.7K Views

Join the DZone community and get the full member experience.

Join For Free

In an enterprise system, populating a data lake relies heavily on interdependent batch processes. Typically these data lakes are updated at a frequency set to a few hours. Today’s business demands high-quality data not in a matter of hours or days but in minutes or seconds.  

The typical steps to update the data lake are (a) build incremental data (b) read the existing data lake files, update incremental changes, and rewrite the data lake files (note: S3 files are immutable).  This also brings in the challenge of ACID compliance between readers and writers of a data lake.

Apache Hudi stands for Hadoop upserts and incremental.  Hudi is a data storage framework that sits on top of HDFS, S3, etc. Hudi brings in streaming primitives to allow incrementally process Update/Delete of records and fetch records that have changed.

In our set up we have DynamoDB as the primary database. Changes in DynamoDB need to reflect in the S3 data lake almost immediately.  The setup to bring this together:

Setup Architecture

  1. Enable Change Data Capture (CDC) on DynamoDB. The changes are pushed to the Kinesis stream.

  2. A Glue (Spark) job acts as a consumer of this change stream. The changes are microbatched using window length. In the script below this length is 100 seconds. The records are then processed and pushed to S3 using hudi connector libraries.

Python
 




x


 
1
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('hoodie.datasource.hive_sync.use_jdbc','false').getOrCreate()
2
sc = spark.sparkContext
3
glueContext = GlueContext(spark.sparkContext)
4

          
5

          
6
commonConfig = {'hoodie.datasource.write.hive_style_partitioning' : 'true','className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'id', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.table.name': 'cust_hudi_f1', 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': 'default', 'hoodie.datasource.hive_sync.table': 'cust_hudi_f1', 'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + 'x14lambdasource/Unsaved' + '/cust_hudi/test_data_20'}
7
partitionDataConfig = { 'hoodie.datasource.write.keygenerator.class' : 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.partitionpath.field': "partitionkey, partitionkey2 ", 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.partition_fields': "partitionkey, partitionkey2"}
8
incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 2}
9

          
10
combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
11

          
12
glue_temp_storage = "s3://x14lambdasource/Unsaved"
13

          
14
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(database = "default", table_name = "test_cdc_cust", transformation_ctx = "DataSource0", additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})
15

          
16
def processBatch(data_frame, batchId):
17
    if (data_frame.count() > 0):
18

          
19
        DataSource0 = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
20
        
21
        your_map = [
22
            ('eventName', 'string', 'eventName', 'string'),
23
            ('userIdentity', 'string', 'userIdentity', 'string'),
24
            ('eventSource', 'string', 'eventSource', 'string'),
25
            ('tableName', 'string', 'tableName', 'string'),
26
            ('recordFormat', 'string', 'recordFormat', 'string'),
27
            ('eventID', 'string', 'eventID', 'string'),
28
            ('dynamodb.ApproximateCreationDateTime', 'long', 'ApproximateCreationDateTime', 'long'),
29
            ('dynamodb.SizeBytes', 'long', 'SizeBytes', 'long'),
30
            ('dynamodb.NewImage.id.S', 'string', 'id', 'string'),
31
            ('dynamodb.NewImage.custName.S', 'string', 'custName', 'string'),
32
            ('dynamodb.NewImage.email.S', 'string', 'email', 'string'),
33
            ('dynamodb.NewImage.registrationDate.S', 'string', 'registrationDate', 'string'),
34
            ('awsRegion', 'string', 'awsRegion', 'string')
35
        ]
36

          
37
        new_df = ApplyMapping.apply(frame = DataSource0, mappings=your_map, transformation_ctx = "applymapping1")
38
        abc = new_df.toDF()
39
        inputDf = abc.withColumn('update_ts_dms',to_timestamp(abc["registrationDate"])).withColumn('partitionkey',abc["id"].substr(-1,1)).withColumn('partitionkey2',abc["id"].substr(-2,1))
40
        
41
        
42

          
43
        # glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf)
44
        glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "custom.spark", connection_options = combinedConf)
45

          
46

          
47
glueContext.forEachBatch(frame = data_frame_DataSource0, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpointLocation":  "s3://x14lambdasource/Unsaved/checkpoint1/"})


The bits to note in the script:

  1. The script assumes a simple customer record in DynamoDB with the following attributes getting inserted or changed: id (partition key), custName (sort key), email, registrationDate.

  2. Various configuration settings are telling Hudi how to function. We have enabled sync with Hive, which means that metatables will also get created in AWS Glue Catalog.  This table can then also be accessed using AWS Athena to query data in real-time.

  3. There is a partition key mentioned in the configuration. This partition key for demonstration purposes is the last digit of the customer id. This will essentially create 10 partitions (0-9) and place customer data in various partitions. There is also a setting here that tells hudi to create hive-style partitions.

  4. You can also control the number of commits through configuration.  This will allow you to time travel in data.

  5. The hudi write happens using connectors.  There are two lines here (one commented).  Either you can use MarketPlace Connector Or use your own custom connector. 

  6. Finally, you notice the glue line where we set up the consumer to get a bunch of records every 100 seconds.

Set up of Hudi Connector in AWS Glue

  1. Market Place Connector: You can go to AWS Marketplace and search for "Apache Hudi Connector."  The steps from there on are pretty simple and guided through the AWS console.

  2. Custom Connector: In some organizations, AWS marketplace access is not available. In order to enable this, you would need two Jar files, Hudi-Spark bundle: hudi-spark-bundle_2.11-0.5.3-rc2. I have compiled this jar and placed it in my GitHub repo for easy reference.
    Also, you need an Avro-Schema jar, again available on my repo.
    In order to create the connector, go to AWS Glue Studio -> Create Custom connector. Select hudi-spark-bundle_2.11-0.5.3-rc2 Jar as S3 URL  Connector Type: Spark and Class Name: org.apache.hudi.DefaultSource. Also while creating your Glue job using a custom connector, include the Avro-Schema jar as a dependent jar.

That is it, now all inserts/updates in DynamoDB will seamlessly reflect in your S3 data lake with real-time access. The script and jars are available in my GitHub repo.

Happy Coding!

AWS Data (computing) Data lake GLUE (uncertainty assessment) Database Stream (computing) Connector (mathematics)

Opinions expressed by DZone contributors are their own.

Related

  • AWS S3 Strategies for Scalable and Secure Data Lake Storage
  • The Future of Data Lakehouses: Apache Iceberg Explained
  • Relational DB Migration to S3 Data Lake Via AWS DMS, Part I
  • Harnessing the Power of AWS Aurora for Scalable and Reliable Databases

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!