DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Keep Calm and Column Wise
  • Accelerating Insights With Couchbase Columnar
  • Migrating MuleSoft System API to AWS Lambda (Part 1)
  • JSON-Based Serialized LOB Pattern

Trending

  • Chat with Your Oracle Database: SQLcl MCP + GitHub Copilot
  • We Went Multi-Cloud and Almost Drowned: Lessons From Running Across AWS, GCP, and Azure
  • The Update Problem REST Doesn't Solve
  • From Data Movement to Local Intelligence: The Shift from Centralized to Federated AI
  1. DZone
  2. Data Engineering
  3. Databases
  4. Transformations of Varying JSON Payloads Using Spark-Streaming

Transformations of Varying JSON Payloads Using Spark-Streaming

With spark-streaming, you just have to create a read-stream from the data source so you can create the write-stream to load the data into a target data source.

By 
Rishabh Jain user avatar
Rishabh Jain
·
Aug. 18, 21 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
8.3K Views

Join the DZone community and get the full member experience.

Join For Free

Spark-streaming can be used to read the data from a source in a streaming fashion. We just have to create a read-stream from the data source and then we can create the write-stream to load the data into a target datasource. 

For this demo, I will assume that we have different JSON payloads coming into a kafka topic that we need to transform and write it to another kafka topic.

Create a ReadStream 

The topic which we will have, is receiving the JSON payloads as messages continuously. For that, we need to first read the messages and create a dataframe using readstream of spark. The readStream function is provided in the Spark and we can use this function to basically create a readStream. This will be reading the streaming payloads from the kafka-topic.


val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


We can create a case-class (e.g. CustomerUnion) which would contain all the possible fields of JSON payload. So that, we would be able to run select query on top of a dataframe without any fail.


val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]
 
val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]

val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")

extractedDFWithSchema.createOrReplaceTempView(“tempView”)


This will give us a dataframe extractedDFWithSchema, which contains the columns as the fields of the payload.

Sample Input Payloads

These are two sample input payloads, but there can be any more payloads with some fields that are not present (variable).

{
“id”: 1234,
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:[email protected],
“Phone”:”2323123”
}


{
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:[email protected],
“Phone”:”2323123”
}

Sample Output Payloads

Based upon id field, we will decide output payload. If there is an id field present, we will consider this as a user update case, and we would send only “Email” and“Phone” in the output payload. We can configure any fields based upon certain conditions. This is just an example. 

In case, id is not present we will send all the fields. So below can be two sample output payloads here:

{
“userid”: 1234,
“Email”:[email protected],
“Phone”:”2323123”
}


{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:[email protected],
“Phone”:”2323123”
}

Start WriteStreams

Once we have the dataframe, we can run as many sql queries and write to kafka topic as per the desired payloads. So for this, we can create a list of all sql queries and the loop through the list and calling the writeStream function. Let's suppose, we have a list called queryList which contains nothing but strings (i.e. sql queries).

Defining a function below for the write stream:

def startWriteStream(query: String): Unit = {

val transformedDf = spark.sql(query)
transformedDf  
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()
  
}


This will start the write stream for each query we have in the list.

queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()


Conclusion

If we are aware of all the possible fields of the input payload, then even if there are some fields not present, our sql query won’t fail. We are specifying the schema of payload as case-class already, it will create the dataframe specifying NULL for absent fields. 

This way, we can utilise spark-streaming to write multiple payloads from same topic to different topic after desired transformation/filters.


Payload (computing) JSON Database

Opinions expressed by DZone contributors are their own.

Related

  • Keep Calm and Column Wise
  • Accelerating Insights With Couchbase Columnar
  • Migrating MuleSoft System API to AWS Lambda (Part 1)
  • JSON-Based Serialized LOB Pattern

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook