DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

SBOMs are essential to circumventing software supply chain attacks, and they provide visibility into various software components.

Related

  • Understanding Time Series Databases
  • NoSQL for Relational Minds
  • Unveiling the Clever Way: Converting XML to Relational Data
  • Keep Calm and Column Wise

Trending

  • How to Submit a Post to DZone
  • DZone's Article Submission Guidelines
  • The Architecture That Keeps Netflix and Slack Always Online
  • The 7 Biggest Cloud Misconfigurations That Hackers Love (and How to Fix Them)
  1. DZone
  2. Data Engineering
  3. Databases
  4. The Right Way to Use Spark and JDBC

The Right Way to Use Spark and JDBC

Apache Spark is a wonderful tool, but sometimes it needs a bit of tuning. We look at a use case involving reading data from a JDBC source.

By 
Avi Yehuda user avatar
Avi Yehuda
·
Dec. 19, 18 · Presentation
Likes (12)
Comment
Save
Tweet
Share
80.8K Views

Join the DZone community and get the full member experience.

Join For Free

a while ago i had to read data from a mysql table, do a bit of manipulations on that data, and store the results on the disk.

the obvious choice was to use spark, as i was already using it for other stuff and it seemed super easy to implement.

this is more or less what i had to do (i removed the part which does the manipulation for the sake of simplicity):

spark.read.format("jdbc").
option("url", "jdbc:mysql://dbhost/sbschhema").
option("dbtable", "mytable").
option("user", "myuser").
option("password", "mypassword").
load().write.parquet("/data/out")

looks good, only it didn't quite work. either it was super slow or it totally crashed depending on the size of the table.

tuning spark and the cluster properties helped a bit, but it didn't solve the problems.

since i was using aws emr , it made sense to give sqoop a try since it is a part of the applications supported on emr.

sqoop import --verbose --connect jdbc:mysql://dbhost/sbschhema --username myuser --table opportunity --password  mypassword --m 20 --as-parquetfile --target-dir /data/out

sqoop performed so much better almost instantly, all you needed to do is to set the number of mappers according to the size of the data and it was working perfectly.

since both spark and sqoop are based on the hadoop map-reduce framework, it's clear that spark can work at least as good as sqoop, i only needed to find out how to do it. i decided to look closer at what sqoop does to see if i can imitate that with spark.

by turning on the verbose flag of sqoop, you can get a lot more details. what i found was that sqoop is splitting the input to the different mappers which makes sense, this is map-reduce after all, spark does the same thing. but before doing that, sqoop does something smart that spark doesn't do.

it first fetches the primary key (unless you give him another key to split the data by), it then checks its minimum and maximum values. then it lets each of its mappers query the data but with different boundaries for the key, so that the rows are split evenly between the mappers.

if, for example, the key maximum value is 100, and there are 5 mappers, than the query of the first mapper will look like this:

select * from mytable where mykey >= 1 and mykey <= 20;

and the query for the second mapper will be like this:

select * from mytable where mykey >= 21 and mykey <= 40;

and so on.

this totally made sense. spark was not working properly because it didn't know how to split the data between the mappers.

so it was time to implement the same logic with spark. this means i had to do these actions on my code to make spark work properly.

  1. fetch the primary key of the table.

  2. find the key minimum and maximum values.

  3. execute spark with those values.

this is the code i ended up with:

def main(args: array[string]){

// parsing input parameters ...

val primarykey = executequery(url, user, password, s"show keys from ${config("schema")}.${config("table")} where key_name = 'primary'").getstring(5)
val result = executequery(url, user, password, s"select min(${primarykey}), max(${primarykey}) from ${config("schema")}.${config("table")}")
    val min = result.getstring(1).toint
    val max = result.getstring(2).toint
    val numpartitions = (max - min) / 5000 + 1

val spark = sparksession.builder().appname("spark reading jdbc").getorcreate()

var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.driver").
option("lowerbound", min).
option("upperbound", max).
option("numpartitions", numpartitions).
option("partitioncolumn", primarykey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()

// some data manipulations here ...

df.repartition(10).write.mode(savemode.overwrite).parquet(outputpath)      

}

and it worked perfectly.

remarks:

  1. the numpartitions i set for spark is just a value i found to give good results according to the number of rows. this can be changed, since the size of the data is also effected by the column size and data types of course.

  2. the repartition action at the end is to avoid having small files.

Database Relational database Data (computing) Sqoop

Published at DZone with permission of Avi Yehuda, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Understanding Time Series Databases
  • NoSQL for Relational Minds
  • Unveiling the Clever Way: Converting XML to Relational Data
  • Keep Calm and Column Wise

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: