Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Alluxio on EMR: Fast Storage Access and Sharing for Spark Jobs

DZone 's Guide to

Alluxio on EMR: Fast Storage Access and Sharing for Spark Jobs

A tutorial on how to use the open source big data platform, Alluxio, as a means of creating faster storage access and data sharing for Spark jobs.

· Big Data Zone ·
Free Resource

Traditionally, if you wanted to run a single Spark job on EMR, you might follow these steps: launch a cluster, runn the job which reads data from storage layer like S3, perform transformations within RDD/Dataframe/Dataset, and, finally, send the results back to S3. Doing it this way, you'd end up having something like this:

Image titleSpark Job on EMRSpark Job on EMR

If we add more Spark jobs across multiple clusters, you could have something like this.

Multiple Spark JobsMultiple Spark jobs

There can be more use cases. For example, sometimes we need to store the intermediate result of Spark job A, and use the intermediate result as input for Spark job B; sometimes you would have multiple Spark jobs read data from the same dataset multiple times. As for now, each of the Spark jobs has to read input data from disk then process it. What if we can read input from memory? Alluxio is a solution for it.

Image titleimage from https://www.alluxio.org/

Alluxio is the storage underneath that usually collocates with the computation frameworks, so that Alluxio can provide fast storage, facilitating data sharing and locality between jobs, regardless of whether they are running on the same computation engine.

I will show you how to set up Alluxio 1.8.1 on EMR 5.17 with a bootstrap script and compare the data loading performance.

Some more detail is described in the following repository: https://github.com/ChengzhiZhao/Alluxio-EMR-bootstrap

Running Spark Jobs With Alluxio

Step 1: Set Up a Bootstrap Script

Use the bootstrap.sh script, which includes a download of Alluxio 1.8.1, and setup the required permissions on EMR

Step 2: Add the Required Configuration

[
  {
    "Classification": "core-site",
    "Properties": {
      "fs.alluxio.impl": "alluxio.hadoop.FileSystem",
      "fs.AbstractFileSystem.alluxio.impl": "alluxio.hadoop.AlluxioFileSystem"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
          "spark.driver.extraClassPath": ":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar",
          "spark.executor.extraClassPath": ":/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar"
     }
  }
]

Step 3: Launch the cluster, take a break, and relax.

Step 4: SSH to EMR cluster and perform copy test data to make sure Alluxio is running.

cd /opt/alluxio-1.8.1-hadoop-2.8
# copy files to alluxio
bin/alluxio fs copyFromLocal LICENSE /Input
# Copied file:///opt/alluxio-1.8.1-hadoop-2.8/LICENSE to /Input
# verify files by listing alluxio
bin/alluxio fs ls /
# 26847   NOT_PERSISTED 02-18-2019 19:22:28:025 100% /Input

You can also check Alluxio UI by going to {Master public DNS}:19999

Click “Browse” and you should see an input file there with a default block size of 128 MB.

Step 5: Use Spark shell to read the “Input” file from Alluxio.

#launch spark-shell
spark-shell
#read data from alluxio and use the DNS -- in this case, ip-10-192-4-226.ec2.internal
val s = sc.textFile("alluxio://ip-10-192-4-226.ec2.internal:19998/Input")
#s: org.apache.spark.rdd.RDD[String] = alluxio://ip-10-192-4-226.ec2.internal:19998/Input MapPartitionsRDD[1] at textFile at <console>:24
s.count
res0: Long = 476

Step 6: Read and compare as a real example.

Note: You can use any test_data_set with Spark, I just pick a test Avro file online.

# read test_data_set
val df=spark.read.format("com.databricks.spark.avro").load("s3://test_bucket/test_data_set/date=2019-04-18")
spark.time(df.count)
#Time taken: 29609 ms
#res0: Long = 86731200
# write df to alluxio
spark.time(df.write.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set"))
#Time taken: 45775 ms
# first execution
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 13477 ms
#res3: Long = 86731200
# second execution
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 372 ms
#res4: Long = 86731200
# restart with another spark-shell
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 9606 ms
#res0: Long = 86731200

As you can see, the second execution is faster and it is similar as we perform cache. But if we close and open another spark-shell, since we read data from Alluxio and data is kept in memory, it would be faster. If you are interested, please refer to this great talk — Best Practices for Using Alluxio with Apache Spark — which talks benchmarking for Alluxio compared to Spark.

Performance Comparison (Count Job With Reading From Disk vs. Alluxio)


Spark vs. Alluxio

Pros:

  • Faster speed to read data as df than from S3.

Cons:

  • Alluxio persist data first in memory to achieve speed, so Spark jobs could have less memory to run jobs.
  • There are overhead time to write df to Allluxio first.

What’s not been covered is Alluxio is also a great storage sharing layer for multiple jobs read and write data. You could have a Flink job that writes to Alluxio and later used by Spark, there are more interesting topics on it. This post focus on how to setup Alluxio 1.8.1 on EMR and run simple test data on it.

Topics:
big data ,spark ,alluxio ,data ,data engineering ,aws

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}