DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory
  • MCP Elicitation: Human-in-the-Loop for MCP Servers
  • Five Nonprofit & Charity APIs That Make Due Diligence Way Less Painful for Developers
  • Keep Calm and Column Wise

Trending

  • Swift Concurrency Part 4: Actors, Executors, and Reentrancy
  • Why Your RAG Pipeline Will Fail Without an MCP Server
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • Manual Investigation: The Hidden Bottleneck in Incident Response
  1. DZone
  2. Data Engineering
  3. Data
  4. Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches

Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches

In this blog, we will explore transformations in Spark using API-based and SQL-based approaches to convert data into JSON payloads for building pipelines

By 
Venugopal Reddy Modhugu user avatar
Venugopal Reddy Modhugu
·
Jul. 18, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
7.0K Views

Join the DZone community and get the full member experience.

Join For Free

In the world of data processing and analytics, Spark has emerged as a powerful tool that empowers developers and data engineers to handle structured and semi-structured data efficiently. By leveraging the distributed processing capabilities of Apache Spark, Spark can effortlessly manage large datasets and execute transformations in parallel across multiple partitions.

When working with large volumes of data, Spark partitions the data and distributes it across the cluster of machines. Spark will perform transformations on each partition independently, leading to improved performance and scalability. In this article, we will explore how to read data from different tables, perform a join operation, and transform the result into a JSON structure using Java Spark SQL code. JSON is a widely used data format for exchanging between systems.

Using multiple Partitions, we can send transformed data parallelly to different target systems, like transformed Json Payload can be easily sent to a REST API endpoint. This allows for seamless integration between Spark SQL and other systems in a distributed environment.

Additionally, JSON is well-suited for publishing data to Apache Kafka, a distributed streaming platform. Kafka enables the efficient and scalable processing of data streams, and its messaging system accepts messages in various formats. By transforming it into JSON format, you can publish it to Kafka topics, allowing downstream applications to process the events in real-time.

Moreover, by transforming the data into a JSON structure, it can be used to store in NoSQL databases like MongoDB. NoSQL databases are designed to handle large volumes of unstructured data, and the JSON format will make it easier for storage and querying capabilities. 

Spark can be connected to various data sources like relational databases, streaming systems, and cloud-based storage. In this article, we will focus on reading data from three CSV files and discuss two approaches, i.e., the API-based approach and the SQL-based approach.

Students Table Schema:

  student_id: Integer

  name: String

  age: Integer

  gender: String

Courses Table Schema:

  student_id: Integer

  course_id: Integer

  course_name: String

Address Table Schema:

  student_id: Integer

  street: String

  city: String

  state: String

  country: String

   zip_code: String

For both approaches, add below maven dependencies in pom.xml:

XML
 
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.13</artifactId>
   <version>3.3.0</version>
 </dependency>

 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.3.0</version>
 </dependency>


Approach 1: API-Based Approach

We first need to create a SparkSession, which acts as the entry point for all Spark operations.

Java
 
public class JsonDataTransformation {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Json Data Transformation")
                .getOrCreate();

        // Read data from the tables
        Dataset<Row> studentsDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("students.csv");

        Dataset<Row> coursesDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("courses.csv");

        Dataset<Row> addressDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("address.csv");

        Dataset<Row> joinedDF = studentsDF.join(coursesDF, studentsDF.col("student_id").equalTo(coursesDF.col("student_id")), "inner")
                .join(addressDF, studentsDF.col("student_id").equalTo(addressDF.col("student_id")), "inner");


        Dataset<Row> resultDF = joinedDF.groupBy(studentsDF.col("student_id"))
                .agg(
                        functions.first(studentsDF.col("name")).as("name"),
                        functions.first(studentsDF.col("age")).as("age"),
                        functions.first(studentsDF.col("gender")).as("gender"),
                        functions.collect_list(
                                functions.struct(coursesDF.col("course_id"), coursesDF.col("course_name"))
                        ).as("courses"),
                        functions.struct(
                                functions.first(addressDF.col("street")).as("street"),
                                functions.first(addressDF.col("city")).as("city"),
                                functions.first(addressDF.col("state")).as("state"),
                                functions.first(addressDF.col("country")).as("country"),
                                functions.first(addressDF.col("zip_code")).as("zip_code")
                        ).as("address")
                );

        // Write the result to a JSON file
        resultDF.write().format("json").save("output.json");
    }
}


In the above code snippet, we start by creating a SparkSession and reading the data from the three files: "students," "courses," and "address." We then perform join operations using the join method, specifying the common column between the tables.

Next, we transform the joined DataFrame into the desired JSON structure using the groupBy and agg functions. Finally, we write the transformed data into a JSON file.

Approach 2: SQL-Based Approach

Spark SQL allows us to write SQL queries directly:

Java
 
public class JsonDataTransformation {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Json Data Transformation")
                .getOrCreate();

        // Read data from the tables
        Dataset<Row> studentsDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("students.csv");

        Dataset<Row> coursesDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("courses.csv");

        Dataset<Row> addressDF = spark.read()
                .format("csv")
                .option("header", "true")
                .load("address.csv");

        studentsDF.createOrReplaceTempView("students");
        coursesDF.createOrReplaceTempView("courses");
        addressDF.createOrReplaceTempView("address");

        // Transform coursesDF into JSON structure
         spark.sql("SELECT student_id, COLLECT_LIST(STRUCT(course_id, course_name)) AS Courses " +
                "FROM courses " +
                "GROUP BY student_id").createOrReplaceTempView("courses");;

        spark.sql("SELECT student_id, STRUCT(street, city, state, country, zip_code) AS Address " +
                "FROM address").createOrReplaceTempView("address");


        // Perform the SQL query to obtain the desired JSON structure
        Dataset<Row> resultDF = spark.sql("SELECT " +
                "s.student_id, " +
                "s.name, " +
                "s.age, " +
                "s.gender, " +
                "tc.Courses AS courses, " +
                "ta.Address  AS address " +
                "FROM students s " +
                "LEFT JOIN courses tc ON s.student_id = tc.student_id " +
                "LEFT JOIN address ta ON s.student_id = ta.student_id");
        // Write the result to a JSON file
        resultDF.write().format("json").save("output.json");
    }
}


In the code snippet above, we use the createOrReplaceTempView method to register each table as a temporary view with its corresponding name. First, convert courses into JSON array and address into JSON object and then write a SQL query that performs the join operations with a common key. Finally, we write the transformed data into a JSON file.

The approach of replacing the original temp tables with the output after every transformation will be helpful for a configuration-driven pipeline.

By using the same approach and transformations, we can extract data from multiple tables and map the fields from these tables to a single object or an array in the target JSON document. we can also move fields from one table to multiple objects in the resulting JSON structure. For Larger datasets, we can use each partition for parallel processing.

Example of the transformed JSON structure:

JSON
 
{
  "student_id": 1,
  "name": "Foo",
  "age": 25,
  "gender": "Male",
  "courses": [
    {
      "course_id": 101,
      "course_name": "Mathematics"
    },
    {
      "course_id": 102,
      "course_name": "Physics"
    }
  ],

  "address": {
    "street": "123 Main St",
    "city": "Dallas",
    "state": "Texas",
    "country": "USA",
    "zip_code": "10001"

  }

}


Conclusion

To summarize, we have explored two approaches for reading data from multiple tables and transforming it into a JSON structure using Spark. The API-based approach involves using Spark's DataFrame API to perform transformations, while the SQL-based approach allows us to write SQL queries directly. Both approaches provide flexibility and efficiency in handling complex data transformation tasks in building pipelines

API Apache Spark JSON Data (computing) sql

Opinions expressed by DZone contributors are their own.

Related

  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory
  • MCP Elicitation: Human-in-the-Loop for MCP Servers
  • Five Nonprofit & Charity APIs That Make Due Diligence Way Less Painful for Developers
  • Keep Calm and Column Wise

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook