How Apache Spark Makes Your Slow MySQL Queries 10x Faster (or More)
How Apache Spark Makes Your Slow MySQL Queries 10x Faster (or More)
For long running queries, Spark can use all cores on all cluster nodes.
Join the DZone community and get the full member experience.
Join For FreeIn this blog post, we’ll discuss how to improve the performance of slow MySQL queries using Apache Spark.
Introduction
In my previous blog post, I wrote about using Apache Spark with MySQL for data analysis and showed how to transform and analyze a large volume of data (text files) with Apache Spark. Vadim also performed a benchmark comparing the performance of MySQL and Spark with Parquet columnar format (using Air traffic performance data). That works great, but what if we don’t want to move our data from MySQL to another storage (i.e., columnar format), and instead want to use “ad hock” queries on top of an existing MySQL server? Apache Spark can help here as well.
TL;DR Version
Using Apache Spark on top of the existing MySQL server(s) (without the need to export or even stream data to Spark or Hadoop), we can increase query performance more than ten times. Using multiple MySQL servers (replication or Percona XtraDB Cluster) gives us an additional performance increase for some queries. You can also use the Spark cache function to cache the whole MySQL query results table.
The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data).
In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results.
I’ve used the same “Airlines On-Time Performance” database as in previous posts. Vadim created some scripts to download data and upload it to MySQL. You can find the scripts here. I’ve also used Apache Spark 2.0, which was released July 26, 2016.
Apache Spark Setup
Starting Apache Spark in standalone mode is easy. To recap:
- Download the Apache Spark 2.0 and place it somewhere.
- Start master.
- Start slave (worker) and attach it to the master.
- Start the app (in this case spark-shell or spark-sql).
Example:
root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077
To connect to Spark, we can use spark-shell (Scala), pyspark (Python) or spark-sql. Since spark-sql is similar to MySQL cli, using it would be the easiest option (even “show tables” works). I also wanted to work with Scala in interactive mode so I’ve used spark-shell as well. In all the examples I’m using the same SQL query in MySQL and Spark, so working with Spark is not that different.
To work with MySQL server in Spark we need Connector/J for MySQL. Download the package and copy the mysql-connector-java-5.1.39-bin.jar to the spark directory, then add the class path to the conf/spark-defaults.conf:
spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar
spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar
Running MySQL Queries via Apache Spark
For this test I was using one physical server with 12 CPU cores (older Intel(R) Xeon(R) CPU L5639 @ 2.13GHz) and 48G of RAM, SSD disks. I’ve installed MySQL and started spark master and spark slave on the same box.
Now we are ready to run MySQL queries inside Spark. First, start the shell (from the Spark directory, /usr/local/spark in my case):
$ ./bin/spark-shell --driver-memory 4G --master spark://server1:7077
Then we will need to connect to MySQL from spark and register the temporary view:
val jdbcDF = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=",
"dbtable" -> "ontime.ontime_part",
"fetchSize" -> "10000",
"partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28"
)).load()
jdbcDF.createOrReplaceTempView("ontime")
So we have created a “datasource” for Spark (or in other words, a “link” from Spark to MySQL). The Spark table name is “ontime” (linked to MySQL ontime.ontime_part table) and we can run SQL queries in Spark, which in turn parse it and translate it in MySQL queries.
“partitionColumn” is very important here. It tells Spark to run multiple queries in parallel, one query per each partition.
Now we can run the query:
val sqlDF = sql("select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin = 'RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10")
sqlDF.show()
MySQL Query Example
Let’s go back to MySQL for a second and look at the query example. I’ve chosen the following query (from my older blog post):
select min(year), max(year) as max_year, Carrier, count(*) as cnt,
sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime
WHERE
DayOfWeek not in (6,7)
and OriginState not in ('AK', 'HI', 'PR', 'VI')
and DestState not in ('AK', 'HI', 'PR', 'VI')
GROUP by carrier HAVING cnt > 100000 and max_year > '1990'
ORDER by rate DESC, cnt desc
LIMIT 10
The query will find the total number of delayed flights per each airline. In addition, the query will calculate the smart “ontime” rating, taking into consideration the number of flights (we do not want to compare smaller air carriers with the large ones, and we want to exclude the older airlines who are not in business anymore).
The main reason I’ve chosen this query is that it is hard to optimize it in MySQL. All conditions in the “where” clause will only filter out ~70% of rows. I’ve done a basic calculation:
mysql> select count(*) FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI');
+-----------+
| count(*) |
+-----------+
| 108776741 |
+-----------+
mysql> select count(*) FROM ontime;
+-----------+
| count(*) |
+-----------+
| 152657276 |
+-----------+
mysql> select round((108776741/152657276)*100, 2);
+-------------------------------------+
| round((108776741/152657276)*100, 2) |
+-------------------------------------+
| 71.26 |
+-------------------------------------+
Table structure:
CREATE TABLE `ontime_part` (
`YearD` int(11) NOT NULL,
`Quarter` tinyint(4) DEFAULT NULL,
`MonthD` tinyint(4) DEFAULT NULL,
`DayofMonth` tinyint(4) DEFAULT NULL,
`DayOfWeek` tinyint(4) DEFAULT NULL,
`FlightDate` date DEFAULT NULL,
`UniqueCarrier` char(7) DEFAULT NULL,
`AirlineID` int(11) DEFAULT NULL,
`Carrier` char(2) DEFAULT NULL,
`TailNum` varchar(50) DEFAULT NULL,
...
`id` int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`,`YearD`),
KEY `covered` (`DayOfWeek`,`OriginState`,`DestState`,`Carrier`,`YearD`,`ArrDelayMinutes`)
) ENGINE=InnoDB AUTO_INCREMENT=162668935 DEFAULT CHARSET=latin1
/*!50100 PARTITION BY RANGE (YearD)
(PARTITION p1987 VALUES LESS THAN (1988) ENGINE = InnoDB,
PARTITION p1988 VALUES LESS THAN (1989) ENGINE = InnoDB,
PARTITION p1989 VALUES LESS THAN (1990) ENGINE = InnoDB,
PARTITION p1990 VALUES LESS THAN (1991) ENGINE = InnoDB,
PARTITION p1991 VALUES LESS THAN (1992) ENGINE = InnoDB,
PARTITION p1992 VALUES LESS THAN (1993) ENGINE = InnoDB,
PARTITION p1993 VALUES LESS THAN (1994) ENGINE = InnoDB,
PARTITION p1994 VALUES LESS THAN (1995) ENGINE = InnoDB,
PARTITION p1995 VALUES LESS THAN (1996) ENGINE = InnoDB,
PARTITION p1996 VALUES LESS THAN (1997) ENGINE = InnoDB,
PARTITION p1997 VALUES LESS THAN (1998) ENGINE = InnoDB,
PARTITION p1998 VALUES LESS THAN (1999) ENGINE = InnoDB,
PARTITION p1999 VALUES LESS THAN (2000) ENGINE = InnoDB,
PARTITION p2000 VALUES LESS THAN (2001) ENGINE = InnoDB,
PARTITION p2001 VALUES LESS THAN (2002) ENGINE = InnoDB,
PARTITION p2002 VALUES LESS THAN (2003) ENGINE = InnoDB,
PARTITION p2003 VALUES LESS THAN (2004) ENGINE = InnoDB,
PARTITION p2004 VALUES LESS THAN (2005) ENGINE = InnoDB,
PARTITION p2005 VALUES LESS THAN (2006) ENGINE = InnoDB,
PARTITION p2006 VALUES LESS THAN (2007) ENGINE = InnoDB,
PARTITION p2007 VALUES LESS THAN (2008) ENGINE = InnoDB,
PARTITION p2008 VALUES LESS THAN (2009) ENGINE = InnoDB,
PARTITION p2009 VALUES LESS THAN (2010) ENGINE = InnoDB,
PARTITION p2010 VALUES LESS THAN (2011) ENGINE = InnoDB,
PARTITION p2011 VALUES LESS THAN (2012) ENGINE = InnoDB,
PARTITION p2012 VALUES LESS THAN (2013) ENGINE = InnoDB,
PARTITION p2013 VALUES LESS THAN (2014) ENGINE = InnoDB,
PARTITION p2014 VALUES LESS THAN (2015) ENGINE = InnoDB,
PARTITION p2015 VALUES LESS THAN (2016) ENGINE = InnoDB,
PARTITION p_new VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */
Even with a “covered” index, MySQL will have to scan ~70M-100M of rows and create a temporary table:
mysql> explain select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: ontime_part
type: range
possible_keys: covered
key: covered
key_len: 2
ref: NULL
rows: 70483364
Extra: Using where; Using index; Using temporary; Using filesort
1 row in set (0.00 sec)
What is the query response time in MySQL:
mysql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;
+------------+----------+---------+----------+-----------------+------+
| min(yearD) | max_year | Carrier | cnt | flights_delayed | rate |
+------------+----------+---------+----------+-----------------+------+
| 2003 | 2013 | EV | 2962008 | 464264 | 0.16 |
| 2003 | 2013 | B6 | 1237400 | 187863 | 0.15 |
| 2006 | 2011 | XE | 1615266 | 230977 | 0.14 |
| 2003 | 2005 | DH | 501056 | 69833 | 0.14 |
| 2001 | 2013 | MQ | 4518106 | 605698 | 0.13 |
| 2003 | 2013 | FL | 1692887 | 212069 | 0.13 |
| 2004 | 2010 | OH | 1307404 | 175258 | 0.13 |
| 2006 | 2013 | YV | 1121025 | 143597 | 0.13 |
| 2003 | 2006 | RU | 1007248 | 126733 | 0.13 |
| 1988 | 2013 | UA | 10717383 | 1327196 | 0.12 |
+------------+----------+---------+----------+-----------------+------+
10 rows in set (19 min 16.58 sec)
19 minutes is definitely not great.
SQL in Spark
Now we want to run the same query inside Spark and let Spark read data from MySQL. We will create a “datasource” and execute the query:
scala> val jdbcDF = spark.read.format("jdbc").options(
| Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
| "dbtable" -> "ontime.ontime_sm",
| "fetchSize" -> "10000",
| "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48"
| )).load()
16/08/02 23:24:12 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 27; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2015.
dbcDF: org.apache.spark.sql.DataFrame = [id: int, YearD: date ... 19 more fields]
scala> jdbcDF.createOrReplaceTempView("ontime")
scala> val sqlDF = sql("select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10")
sqlDF: org.apache.spark.sql.DataFrame = [min(yearD): date, max_year: date ... 4 more fields]
scala> sqlDF.show()
+----------+--------+-------+--------+---------------+----+
|min(yearD)|max_year|Carrier| cnt|flights_delayed|rate|
+----------+--------+-------+--------+---------------+----+
| 2003| 2013| EV| 2962008| 464264|0.16|
| 2003| 2013| B6| 1237400| 187863|0.15|
| 2006| 2011| XE| 1615266| 230977|0.14|
| 2003| 2005| DH| 501056| 69833|0.14|
| 2001| 2013| MQ| 4518106| 605698|0.13|
| 2003| 2013| FL| 1692887| 212069|0.13|
| 2004| 2010| OH| 1307404| 175258|0.13|
| 2006| 2013| YV| 1121025| 143597|0.13|
| 2003| 2006| RU| 1007248| 126733|0.13|
| 1988| 2013| UA|10717383| 1327196|0.12|
+----------+--------+-------+--------+---------------+----+
Spark-shell does not show the query time. This can be retrieved from Web UI or from spark-sql. I’ve re-run the same query in spark-sql:
./bin/spark-sql --driver-memory 4G --master spark://thor:7077
spark-sql> CREATE TEMPORARY VIEW ontime
> USING org.apache.spark.sql.jdbc
> OPTIONS (
> url "jdbc:mysql://localhost:3306/ontime?user=root&password=",
> dbtable "ontime.ontime_part",
> fetchSize "1000",
> partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "48"
> );
16/08/04 01:44:27 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 26; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2014.
Time taken: 3.864 seconds
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;
16/08/04 01:45:13 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
2003 2013 EV 2962008 464264 0.16
2003 2013 B6 1237400 187863 0.15
2006 2011 XE 1615266 230977 0.14
2003 2005 DH 501056 69833 0.14
2001 2013 MQ 4518106 605698 0.13
2003 2013 FL 1692887 212069 0.13
2004 2010 OH 1307404 175258 0.13
2006 2013 YV 1121025 143597 0.13
2003 2006 RU 1007248 126733 0.13
1988 2013 UA 10717383 1327196 0.12
Time taken: 139.628 seconds, Fetched 10 row(s)
So the response time of the same query is almost 10x faster (on the same server, just one box). But now how was this query translated to MySQL queries, and why it is so much faster? Here is what is happening inside MySQL:
Inside MySQL
Spark:
scala> sqlDF.show()
[Stage 4:> (0 + 26) / 26]
MySQL:
mysql> select id, info from information_schema.processlist where info is not NULL and info not like '%information_schema%';
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | info |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 10948 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002) |
| 10965 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2007 AND yearD < 2008) |
| 10966 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1991 AND yearD < 1992) |
| 10967 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1994 AND yearD < 1995) |
| 10968 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1998 AND yearD < 1999) |
| 10969 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2010 AND yearD < 2011) |
| 10970 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2002 AND yearD < 2003) |
| 10971 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2006 AND yearD < 2007) |
| 10972 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1990 AND yearD < 1991) |
| 10953 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2009 AND yearD < 2010) |
| 10947 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1993 AND yearD < 1994) |
| 10956 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD < 1989 or yearD is null) |
| 10951 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2005 AND yearD < 2006) |
| 10954 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1996 AND yearD < 1997) |
| 10955 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2008 AND yearD < 2009) |
| 10961 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1999 AND yearD < 2000) |
| 10962 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2011 AND yearD < 2012) |
| 10963 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2003 AND yearD < 2004) |
| 10964 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1995 AND yearD < 1996) |
| 10957 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2004 AND yearD < 2005) |
| 10949 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1989 AND yearD < 1990) |
| 10950 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1997 AND yearD < 1998) |
| 10952 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2013) |
| 10958 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1992 AND yearD < 1993) |
| 10960 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2000 AND yearD < 2001) |
| 10959 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2012 AND yearD < 2013) |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
26 rows in set (0.00 sec)
Spark is running 26 queries in parallel, which is great. As the table is partitioned it only uses one partition per query, but scans the whole partition:
mysql> explain partitions SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002)G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: ontime_part
partitions: p2001
type: ALL
possible_keys: NULL
key: NULL
key_len: NULL
ref: NULL
rows: 5814106
Extra: Using where
1 row in set (0.00 sec)
In this case, as the box has 12 CPU cores / 24 threads, it efficently executes 26 queries in parallel and the partitioned table helps to avoid contention issues (I wish MySQL could scan partitions in parallel, but it can’t at the time of writing).
Another interesting thing is that Spark can “push down” some of the conditions to MySQL, but only those inside the “where” clause. All group by/order by/aggregations are done inside Spark. It needs to retrieve data from MySQL to satisfy those conditions and will not push down group by/order by/etc to MySQL.
That also means that queries without “where” conditions (for example “select count(*) as cnt, carrier from ontime group by carrier order by cnt desc limit 10”) will have to retrieve all data from MySQL and load it to Spark (as opposed to MySQL will do all group by inside). Running it in Spark might be slower or faster (depending on the amount of data and use of indexes) but it also requires more resources and potentially more memory dedicated for Spark. The above query is translated to 26 queries, each does a “select carrier from ontime_part where (yearD >= N AND yearD < N)”
Pushing Down the Whole Query Into MySQL
If we want to avoid sending all data from MySQL to Spark we have the option of creating a temporary table on top of a query (similar to MySQL’s create temporary table as select …). In Scala:
val tableQuery =
"(select yeard, count(*) from ontime group by yeard) tmp"
val jdbcDFtmp = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/ontime?user=root&password=",
"dbtable" -> tableQuery,
"fetchSize" -> "10000"
)).load()
jdbcDFtmp.createOrReplaceTempView("ontime_tmp")
In Spark SQL:
CREATE TEMPORARY VIEW ontime_tmp
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
dbtable "(select yeard, count(*) from ontime_part group by yeard) tmp",
fetchSize "1000"
);
select * from ontime_tmp;
Please note:
- We do not want to use “partitionColumn” here, otherwise we will see 26 queries like this in MySQL: “SELECT yeard, count(*) FROM (select yeard, count(*) from ontime_part group by yeard) tmp where (yearD >= N AND yearD < N)” (obviously not optimal)
- This is not a good use of Spark, more like a “hack.” The only good reason to do it is to be able to have the result of the query as a source of an additional query.
Query Cache in Spark
Another option is to cache the result of the query (or even the whole table) and then use .filter in Scala for faster processing. This requires sufficient memory dedicated for Spark. The good news is we can add additional nodes to Spark and get more memory for Spark cluster.
Spark SQL example:
CREATE TEMPORARY VIEW ontime_latest
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306/ontime?user=root&password=",
dbtable "ontime.ontime_part partition (p2013, p2014)",
fetchSize "1000",
partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "26"
);
cache table ontime_latest;
spark-sql> cache table ontime_latest;
Time taken: 465.076 seconds
spark-sql> select count(*) from ontime_latest;
5349447
Time taken: 0.526 seconds, Fetched 1 row(s)
spark-sql> select count(*), dayofweek from ontime_latest group by dayofweek;
790896 1
634664 6
795540 3
794667 5
808243 4
743282 7
782155 2
Time taken: 0.541 seconds, Fetched 7 row(s)
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_latest WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin='RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;
2013 2013 MQ 9339 1734 0.19
2013 2013 B6 3302 516 0.16
2013 2013 EV 9225 1331 0.14
2013 2013 UA 1317 177 0.13
2013 2013 AA 5354 620 0.12
2013 2013 9E 5520 593 0.11
2013 2013 WN 10968 1130 0.1
2013 2013 US 5722 549 0.1
2013 2013 DL 6313 478 0.08
2013 2013 FL 2433 205 0.08
Time taken: 2.036 seconds, Fetched 10 row(s)
Here we cache partitions p2013 and p2014 in Spark. This retrieves the data from MySQL and loads it in Spark. After that all queries run on the cached data and will be much faster.
With Scala we can cache the result of a query and then use filters to only get the information we need:
val sqlDF = sql("SELECT flightdate, origin, dest, depdelayminutes, arrdelayminutes, carrier, TailNum, Cancelled, Diverted, Distance from ontime")
sqlDF.cache().show()
scala> sqlDF.filter("flightdate='1988-01-01'").count()
res5: Long = 862
Using Spark with Percona XtraDB Cluster
As Spark can be used in a cluster mode and scale with more and more nodes, reading data from a single MySQL is a bottleneck. We can use MySQL replication slave servers or Percona XtraDB Cluster (PXC) nodes as a Spark datasource. To test it out, I’ve provisioned Percona XtraDB Cluster with three nodes on AWS (I’ve used m4.2xlarge Ubuntu instances) and also started Apache Spark on each node:
- Node1 (pxc1): Percona Server + Spark Master + Spark worker node + Spark SQL running
- Node2 (pxc2): Percona Server + Spark worker node
- Node3 (pxc3): Percona Server + Spark worker node
All the Spark worker nodes use the memory configuration option:
cat conf/spark-env.sh
export SPARK_WORKER_MEMORY=24g
Then I can start spark-sql (also need to have connector/J JAR file copied to all nodes):
$ ./bin/spark-sql --driver-memory 4G --master spark://pxc1:7077
When creating a table, I still use localhost to connect to MySQL (url “jdbc:mysql://localhost:3306/ontime?user=root&password=xxx”). As Spark worker nodes are running on the same instance as Percona Cluster nodes, it will use the local connection. Then running a Spark SQL will evenly distribute all 26 MySQL queries among the three MySQL nodes.
Alternatively we can run Spark cluster on a separate host and connect it to the HA Proxy, which in turn will load balance selects across multiple Percona XtraDB Cluster nodes.
Query Performance Benchmark
Finally, here is the query response time test on the three AWS Percona XtraDB Cluster nodes:
Query 1: select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0))as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeeknot in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUPby carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;
Query / Index type | MySQL Time | Spark Time (3 nodes) | Times Improvement |
No covered index (partitioned) | 19 min 16.58 sec | 192.17 sec | 6.02 |
Covered index (partitioned) | 2 min 10.81 sec | 48.38 sec | 2.7 |
Query 2: select dayofweek, count(*) from ontime_part group by dayofweek;
Query / Index type | MySQL Time | Spark Time (3 nodes) | Times Improvement |
No covered index (partitoned) | 19 min 15.21 sec | 195.058 sec | 5.92 |
Covered index (partitioned) | 1 min 10.38 sec | 27.323 sec | 2.58 |
Now, this looks really good, but it can be better. With three nodes @ m4.2xlarge we will have 8*3 = 24 cores total (although they are shared between Spark and MySQL). We can expect 10x improvement, especially without a covered index.
However, on m4.2xlarge the amount of RAM did not allow me to run MySQL out of memory, so all reads were from EBS non-provisioned IOPS, which only gave me ~120MB/sec. I’ve redone the test on a set of three dedicated servers:
- 28 cores E5-2683 v3 @ 2.00GHz
- 240GB of RAM
- Samsung 850 PRO
The test was running completely off RAM:
Query 1 (from the above)
Query / Index type | MySQL Time | Spark Time (3 nodes) | Times Improvement |
No covered index (partitoned) | 3 min 13.94 sec | 14.255 sec | 13.61 |
Covered index (partitioned) | 2 min 2.11 sec | 9.035 sec | 13.52 |
Query 2: select dayofweek, count(*) from ontime_part group by dayofweek;
Query / Index type | MySQL Time | Spark Time (3 nodes) | Times Improvement |
No covered index (partitoned) | 2 min 0.36 sec | 7.055 sec | 17.06 |
Covered index (partitioned) | 1 min 6.85 sec | 4.514 sec | 14.81 |
With this amount of cores and running out of RAM we actually do not have enough concurrency as the table only have 26 partitions. I’ve tried the unpartitioned table with ID primary key and use 128 partitions.
Note About Partitioning
I’ve used partitioned table (partition by year) in my tests to help reduce MySQL level contention. At the same time the “partitionColumn” option in Spark does not require that MySQL table is partitioned. For example, if a table has a primary key, we can use this CREATE VIEW in Spark :
CREATE OR REPLACE TEMPORARY VIEW ontime
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=",
dbtable "ontime.ontime",
fetchSize "1000",
partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128"
);
Assuming we have enough MySQL servers (i.e., nodes or slaves), we can increase the number of partitions and that can improve the parallelism (as opposed to only 26 partitions when running one partition by year). Actually, the above test gives us even better response time: 6.44 seconds for query 1.
Where Spark Doesn’t Work Well
For faster queries (those that use indexes or can efficiently use an index) it does not make sense to use Spark. Retrieving data from MySQL and loading it into Spark is not free. This overhead can be significant for faster queries. For example, a query like this select count(*) from ontime_part where YearD = 2013 and DayOfWeek = 7 and OriginState = 'NC' and DestState ='NC'; will only scan 1300 rows and will return instant (0.00 seconds reported by MySQL).
An even better example is this: select max(id) from ontime_part. In MySQL, the query will use the index and all calculations will be done inside MySQL. Spark, on the other hand, will have to retrieve all IDs (select id from ontime_part) from MySQL and calculate maximum. That took 24.267 seconds.
Conclusion
Using Apache Spark as an additional engine level on top of MySQL can help to speed up the slow reporting queries and add much-needed scalability for the long running select queries. In addition, Spark can help with query caching for frequent queries.
PS: Visual Explain Plan With Spark
Spark Web GUI provides lots of ways of monitoring Spark jobs. For example, it shows the “job” progress:
And SQL visual explain details:
Published at DZone with permission of Alexander Rubin, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ parent.urlSource.name }}