Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Automating Hadoop Computations on AWS

DZone's Guide to

Automating Hadoop Computations on AWS

Today, we will cover a solution for automating Big Data (Hadoop) computations. And, to show it in action, I'm going to provide an example using open dataset.

· Big Data Zone ·
Free Resource

The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.

The Hadoop framework provides a lot of useful tools for big data projects. But it is too complex to manage it all by yourself. Several months ago, I was deploying a Hadoop cluster using Cloudera. And I discovered that it works well only for an architecture in which compute and storage capacity is constant. It is a nightmare to use a tool like Cloudera for a system that needs to scale. That is where cloud technologies come in and make our life easier. Amazon Web Services (AWS) is the best option for this use case. AWS provides a managed solution for Hadoop called Elastic Map Reduce (EMR). EMR allows developers to quickly start Hadoop clusters, do the necessary computations, and terminate them when all the work is done. To automate this process even further, AWS provides an SDK for EMR services. Using it, you can launch your Hadoop task with a single command. I'll show how it is done in an example below.

I am going to execute a Spark job on a Hadoop cluster in EMR. My goal will be to compute average comment length for each star rating (1-5) for a large dataset of customer reviews on amazon.com. Usually, to execute Hadoop computations, we need all the data to be stored in HDFS. But EMR integrates with S3 and we don’t need to launch data instances and copy large amounts of it for the sake of a two-minute computation. This compatibility with S3 is a big advantage of using EMR. Many datasets are distributed using S3, including the one I’m using in this example (you can find it here).

Initially, you should launch the EMR cluster manually (using a console) to let AWS create the necessary security groups for cluster images (they will be required for our automated script execution). To do that, go to the EMR service page, click ‘Create cluster,’ and launch a cluster with default settings. After that, terminate it and you’ll have two default security groups created for master and slave instances. You should also create an S3 bucket to store results from Spark job execution.

The whole solution for automation contains two Python files. The first one is a Spark job itself (that will be executed on a cluster). And the second one is a launcher script which will invoke EMR and pass a Spark job into it. This script will be executed locally on your machine. You should have the boto3 Python library installed to use the AWS SDK.

Contents of the job.py file:

import sys
import pyspark

sc = pyspark.SparkContext(appName="Reviews")

def to_entity(item):
    words = item.split('\t')
    try:
        rating = int(words[7])
        comment = words[13]
        return (rating, len(comment))
    except ValueError:
        return (None, None)

def avg_sec(a, b): return (a[0] + b, a[1] + 1)
def avg_comb(a, b): return (a[0] + b[0], a[1] + b[1])
def avg_eval(a): return a[0] / a[1]

fileName = 'amazon_reviews_us_Camera_v1_00.tsv.gz'
dirName = 's3://amazon-reviews-pds/tsv/'
rdd = sc.textFile(dirName + fileName)

outFile = sys.argv[1]

# filter to skip header
reviews = rdd.map(to_entity).filter(lambda x: x[0] is not None).persist()

init = (0, 0)
results = reviews.aggregateByKey(init, avg_sec, avg_comb).mapValues(avg_eval)
results.saveAsTextFile(outFile)

Contents of the launcher.py file:

import boto3
import time
import argparse

parser = argparse.ArgumentParser(description='Launches Spark job on AWS EMR')

parser.add_argument('aws_access_key', metavar='ACCESS_KEY', help='AWS Access Key')
parser.add_argument('aws_secret_key', metavar='SECRET_KEY', help='AWS Secret Key')
parser.add_argument('aws_region', metavar='REGION', help='AWS Region')
parser.add_argument('bucket', metavar='BUCKET', help='S3 Bucket')
parser.add_argument('job_file', metavar='JOB_FILE', help='Spark Job file')
parser.add_argument('result_folder', metavar='RESULT_FOLDER', help='S3 folder for results')
parser.add_argument('cluster_name', metavar='CLUSTER_NAME', help='EMR Cluster Name')
parser.add_argument('key_name', metavar='SSH_KEY_NAME', help='SSH Key Name')
parser.add_argument('master_sg', metavar='MASTER_SG', help='Security Group ID for Master instance group')
parser.add_argument('slave_sg', metavar='SLAVE_SG', help='Security Group ID for Slave instance group')

args = parser.parse_args()

client = boto3.client(
    'emr',
    aws_access_key_id=args.aws_access_key,
    aws_secret_access_key=args.aws_secret_key,
    region_name=args.aws_region
)

s3Client = boto3.client(
    's3',
    aws_access_key_id=args.aws_access_key,
    aws_secret_access_key=args.aws_secret_key,
    region_name=args.aws_region
)

stamp = str(int(time.time()))
s3JobFileName = 'job_' + stamp + '.py'
s3ResultFolderName = args.result_folder + '_' + stamp

s3Client.upload_file(args.job_file, args.bucket, s3JobFileName)

response = client.run_job_flow(
    Name=args.cluster_name,
    LogUri='s3://aws-logs-511622038217-eu-central-1/elasticmapreduce/',
    ReleaseLabel='emr-5.17.0',
    Instances={
        'MasterInstanceType': 'm4.large',
        'SlaveInstanceType': 'm4.large',
        'InstanceCount': 5,
        'Ec2KeyName': args.key_name,
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
        'HadoopVersion': '2.8.4',
        'EmrManagedMasterSecurityGroup': args.master_sg,
        'EmrManagedSlaveSecurityGroup': args.slave_sg
    },
    Steps=[
        {
            'Name': 'Spark job',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'spark-submit', 
                    '--deploy-mode', 
                    'cluster', 
                    's3://' + args.bucket + '/' + s3JobFileName,
                    's3://' + args.bucket + '/' + s3ResultFolderName
                ]
            }
        },
    ],
    Applications=[
        {
            'Name': 'Spark'
        },
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',
    EbsRootVolumeSize=32
)

print("Response: " + str(response))

Since launcher.py requires many parameters, it is easier to invoke it via a template shell script containing this command:

python3 launcher.py \
  <AWS_KEY_ID> \
  <AWS_SECRET> \
  <REGION> \
  <S3 bucket (already created)> \
  <Spark job file (local)> \
  <Output S3 foler name> \
  <CLUSTER_NAME> \
  <EC2_SSH_KEY_NAME> \
  <SECURITY_GROUP_ID_FOR_MASTER_INSTANCE (already created)> \
  <SECURITY_GROUP_ID_FOR_SLAVE_INSTANCES (already created)>

Remember to provide an absolute local path for a Spark job file (such as ‘/home/user/job.py’) and just a folder name (not an absolute path) for resulting data in S3 (such as ‘results’). After successful execution, you will be able to see the results in your S3 folder.

The source code for this example can be found on GitHub, hereYou can modify it to meet your specific requirements and make your big data workload more automated.

Managing data at scale doesn’t have to be hard. Find out how the completely free, open source HPCC Systems platform makes it easier to update, easier to program, easier to integrate data, and easier to manage clusters. Download and get started today.

Topics:
big data ,aws ,hadoop tutorial ,python for data science ,python tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}