DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  • Simplify NoSQL Database Integration in Java With Eclipse JNoSQL 1.1.3
  • GenAI: Spring Boot Integration With LocalAI for Code Conversion
  • Unlocking the Power of Oracle NoSQL With Quarkus: Seamless Integration for Cloud-Age Applications

Trending

  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  • The Smart Way to Talk to Your Database: Why Hybrid API + NL2SQL Wins
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Navigating and Modernizing Legacy Codebases: A Developer's Guide to AI-Assisted Code Understanding
  1. DZone
  2. Data Engineering
  3. Big Data
  4. PySpark: Java UDF Integration

PySpark: Java UDF Integration

The main topic of this article is the implementation of UDF (User Defined Function) in Java invoked from Spark SQL in PySpark.

By 
Kiran Kumar user avatar
Kiran Kumar
·
Aug. 17, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
28.4K Views

Join the DZone community and get the full member experience.

Join For Free

PySpark is the Spark API implementation using the Non-JVM language Python. Though developers utilize PySpark by implementing Python Code using Spark API’s (Python version of Spark API’s), internally, Spark uses data to be cached in JVM.

The Python Driver Program has SparkContext, which uses Py4J, a specialized library for Python Java interoperability to launch JVM and create a JavaSparkContext.

On a high level, transformations on RDD in Python are mapped to transformations on PythonRDD objects in Java.

PythonRDD objects in the Executors launch Python subprocess, which communicates using pipes (internally uses Sockets) that transfers data and code that undergoes serialization and deserialization process

In order to remove this overhead of serialization and deserialization, we need to leverage Spark DataFrames in PySpark where the data remains inside the JVM as long as possible rather than going for RDD’s.

The main topic of this article is the implementation of UDF (User Defined Function) in Java invoked from Spark SQL in PySpark.

User Defined Functions are used in Spark SQL for custom transformations of data, which are very useful if Spark built-in transformations are not supported for any business rule to be implemented

If any user-defined function is implemented in Python, internally this undergoes data to be serialized from JVM and be passed into separate Python process where UDF runs. The result of UDF execution is passed further back to serialization, deserialization and returned to JVM.

This incurs a lot of performance overhead as this is undergoing Serialization and Deserialization in 2 trips.

Image titleFig 1.UDF implementation in Python where this function is utilized in PySpark SQL implementation

To avoid the performance overhead, the recommendation is to implement UDF in Java/Scala that will be utilizing JVM itself as the environment for execution, which eliminates the need for serialization and network trip

Image titleFig 2.UDF implementation in Java where this function is utilized in PySpark SQL implementation

I would be showcasing a proof of concept that integrates Java UDF in PySpark code

Implement 2 classes in Java that implements org.apache.spark.sql.api.java.UDF1 interface

package com.JavaUDFProj;
import org.apache.spark.sql.api.java.UDF1;
public class AddNumber implements UDF1<Long, Long> {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long num) throws Exception {
return (num + 5);
   }
}

AddNumber.java takes the variable of the long data type as the input argument, adds 5 to this variable, and gives the result

Another class MultiplyNumber.java takes the variable of the long data type as the input argument, multiplies by 5 to this variable, and gives the result

package com.JavaUDFProj;
import org.apache.spark.sql.api.java.UDF1;
public class MultiplyNumber implements UDF1<Long, Long> {
private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long num) throws Exception {
return (num * 5);
    }
}

The Maven dependency to these 2 UDF1 implementation classes is as below:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.0.0</version>
</dependency>

Compile the above 2 Java classes and make it as one jar, which is named in this example as “javaudfdemo.jar”.

PySpark code is tested with Spark 2.3.1 version

The PySpark code is as below:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType
from pyspark import SparkContext
sc = SparkContext()
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse").getOrCreate()
spark.udf.registerJavaFunction("numAdd", "com.JavaUDFProj.AddNumber", LongType())
spark.udf.registerJavaFunction("numMultiply", "com.JavaUDFProj.MultiplyNumber", LongType())
import json
j = {'num1':2, 'num2':3}
a=[json.dumps(j)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
df.registerTempTable("numbersdata")
df1=spark.sql("SELECT numMultiply(num1) As num1, numAdd(num2) AS num2 from numbersdata")
df1.show(10)

In PySpark, register the 2 Java classes as given below

spark.udf.registerJavaFunction("numAdd", "com.JavaUDFProj.AddNumber", LongType())

spark.udf.registerJavaFunction("numMultiply", "com.JavaUDFProj.MultiplyNumber", LongType())

registerJavaFunction takes 3 arguments, which are function name to be used in spark sql, Java class name that implements UDF and the return type of UDF. The registerJavaFunction will register UDF to be used in Spark SQL.

In this example, PySpark code, JSON is given as input, which is further created as a DataFrame.

A SQL query is issued on the DataFrame registered as a table with the name “numbersdata” and the columns are transformed using the UDF’s.

In this example, the transformation functions are UDF's that registered with names "numMultiply" and "numAdd"

Run the PySpark command using as below command from the windows command prompt or shell.

Issue spark-submit command in the folder, in this example, the jar and Python files are present in the same location that spark-submit is invoked. 

spark-submit –jars javaudfdemo.jar test.py

The output is as below:

+----+----+

|num1|num2|

+----+----+

|  10|   8|

+----+----+

In this way, user-defined functions implemented in Java can be called from PySpark, which will improve the performance of the application rather than implementing functions in Python.

Java (programming language) pyspark Apache Spark Integration

Opinions expressed by DZone contributors are their own.

Related

  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  • Simplify NoSQL Database Integration in Java With Eclipse JNoSQL 1.1.3
  • GenAI: Spring Boot Integration With LocalAI for Code Conversion
  • Unlocking the Power of Oracle NoSQL With Quarkus: Seamless Integration for Cloud-Age Applications

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!