Over a million developers have joined DZone.

Run Your Hadoop MapReduce Job on Amazon EMR

· Big Data Zone

Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

A while ago I posted about how to setup an Amazon Elastic MapReduce cluster by using the CLI. In this post I will show you how to setup the cluster using the Java SDK for AWS.
The best way to illustrate how to do this with the Java AWS SDK is to show a complete example, so lets start.

  • Set Up a New Maven Project
  • For this task, I created a new default Maven project. The main class in this project is the one that you can run to initiate the EMR cluster and perform the MapReduce job I created in this post:


package net.pascalalma.aws.emr;
 
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
 
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
 
/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 22-07-13
 * Time: 20:45
 */
public class MyClient {
 
    private static final String HADOOP_VERSION = "1.0.3";
    private static final int INSTANCE_COUNT = 1;
    private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();
    private static final UUID RANDOM_UUID = UUID.randomUUID();
    private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString();
    private static final String BUCKET_NAME = "map-reduce-intro";
    private static final String S3N_HADOOP_JAR =
            "s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar";
    private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/";
    private static final String[] JOB_ARGS =
            new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt",
                    "s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME};
    private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
    private static final List<JobFlowExecutionState> DONE_STATES = Arrays
            .asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED,
                    JobFlowExecutionState.FAILED,
                    JobFlowExecutionState.TERMINATED});
    static AmazonS3 s3;
    static AmazonElasticMapReduceClient emr;
 
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
                MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties"));
        s3 = new AmazonS3Client(credentials);
        emr = new AmazonElasticMapReduceClient(credentials);
        emr.setRegion(Region.getRegion(Regions.EU_WEST_1));
    }
 
    private static JobFlowInstancesConfig configInstance() throws Exception {
 
        // Configure instances to use
        JobFlowInstancesConfig instance = new JobFlowInstancesConfig();
        instance.setHadoopVersion(HADOOP_VERSION);
        instance.setInstanceCount(INSTANCE_COUNT);
        instance.setMasterInstanceType(INSTANCE_TYPE);
        instance.setSlaveInstanceType(INSTANCE_TYPE);
        // instance.setKeepJobFlowAliveWhenNoSteps(true);
        // instance.setEc2KeyName("4synergy_palma");
 
        return instance;
    }
 
    private static void runCluster() throws Exception {
        // Configure the job flow
        RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance());
        request.setLogUri(S3N_LOG_URI);
 
        // Configure the Hadoop jar to use
        HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
        jarConfig.setArgs(ARGS_AS_LIST);
 
        try {
 
            StepConfig enableDebugging = new StepConfig()
                    .withName("Enable debugging")
                    .withActionOnFailure("TERMINATE_JOB_FLOW")
                    .withHadoopJarStep(new StepFactory().newEnableDebuggingStep());
 
            StepConfig runJar =
                    new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),
                            jarConfig);
 
            request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar}));
 
            //Run the job flow
            RunJobFlowResult result = emr.runJobFlow(request);
 
            //Check the status of the running job
            String lastState = "";
 
            STATUS_LOOP:
            while (true) {
                DescribeJobFlowsRequest desc =
                        new DescribeJobFlowsRequest(
                                Arrays.asList(new String[]{result.getJobFlowId()}));
                DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
                for (JobFlowDetail detail : descResult.getJobFlows()) {
                    String state = detail.getExecutionStatusDetail().getState();
                    if (isDone(state)) {
                        System.out.println("Job " + state + ": " + detail.toString());
                        break STATUS_LOOP;
                    } else if (!lastState.equals(state)) {
                        lastState = state;
                        System.out.println("Job " + state + " at " + new Date().toString());
                    }
                }
                Thread.sleep(10000);
            }
        } catch (AmazonServiceException ase) {
            System.out.println("Caught Exception: " + ase.getMessage());
            System.out.println("Reponse Status Code: " + ase.getStatusCode());
            System.out.println("Error Code: " + ase.getErrorCode());
            System.out.println("Request ID: " + ase.getRequestId());
        }
    }
 
    public static boolean isDone(String value) {
        JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
        return DONE_STATES.contains(state);
    }
 
    public static void main(String[] args) {
        try {
            init();
            runCluster();
        } catch (Exception e) {
            e.printStackTrace(); 
        }
    }
}

In this class, I declare some constants first. I assume these will be obvious. In the init() method, I use the credential properties file that I added to the project. I added this file to the ‘/main/resources’ folder of my Maven project. It contains my access key and secret key.
Also, I set the region to ‘EU-WEST’ for the EMR client.

The next method is ‘configInstance()’. In this method, I create and configure the JobFlowInstance by setting the Hadoop version, number of instances, size of instances, etc. Also, you can configure the ‘keepAlive’ setting to keep the cluster alive after the jobs have finished. This could be helpful in some cases. If you want to use this option it might also be useful to set the key-pair you want to use in order to access the cluster, because I wasn’t able to access the cluster without setting this key.

The method ‘runCluster()’ is were the cluster actually runs. It creates the request to initiate the cluster. In this request, the steps are added that have to be executed. In our case, one of the steps is running the JAR file we created in the previous steps. I also added a debug step so we have access to the debug logging after the cluster is finished and terminated. We can simply access the log files in the S3 bucket that I set with the constant ‘S3N_LOG_URI’.

When this request is created, we start the cluster based on this request. Then we poll every ten seconds to see whether the job has finished and show a message in the console indicating the current state of the job.

To execute the first run, we have to prepare the input.

  • Prepare the Input
  • As input for the job (see this for more info about this example job), we have to make the dictionary contents available for the EMR cluster. Furthermore, we have to make the JAR file available, and make sure the output and log directory exists in our S3 buckets. There are several ways to do this: You can do it programmatically by using the SDK, from the command line using using S3cmd or by using the AWS Management Console. You should end up with a similar setup to this:

    • s3://map-reduce-intro
    • s3://map-reduce-intro/input
    • s3://map-reduce-intro/input/input.txt
    • s3://map-reduce-intro/job
    • s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
    • s3://map-reduce-intro/log
    • s3://map-reduce-intro/result

    When using S3cmd, it looks like this:


s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/
2013-07-20 13:06    469941   s3://map-reduce-intro/input/input.txt
2013-07-20 14:12      5491   s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
2013-08-06 14:30         0   s3://map-reduce-intro/log/
2013-08-06 14:27         0   s3://map-reduce-intro/result/

In the example above, I already introduced an S3 client in the code. You can also use that to prepare the input or get the output as part of the client’s job.

  • Run the Cluster
  • When everything is in place, we can run the job. I simply run the main method of ‘MyClient’ in IntelliJ and get the following output in my console:


Job STARTING at Tue Aug 06 16:31:55 CEST 2013
Job RUNNING at Tue Aug 06 16:36:18 CEST 2013
Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013
Job COMPLETED: {
  JobFlowId: j-JDB14HVTRC1L
  ,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43
  ,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0
  ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
    ,StartDateTime: Tue Aug 06 16:36:14 CEST 2013
    ,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013
    ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013
    ,LastStateChangeReason: Steps completed}
  ,Instances: {MasterInstanceType: m1.small
    ,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com
    ,MasterInstanceId: i-93268ddf
    ,InstanceCount: 1
    ,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ
      ,Name: master
      ,Market: ON_DEMAND
      ,InstanceRole: MASTER
      ,InstanceType: m1.small
      ,InstanceRequestCount: 1
      ,InstanceRunningCount: 0
      ,State: ENDED
      ,LastStateChangeReason: Job flow terminated
      ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:34:28 CEST 2013
      ,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013
      ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}]
    ,NormalizedInstanceHours: 1
    ,Ec2KeyName: 4synergy_palma
    ,Placement: {AvailabilityZone: eu-west-1a}
    ,KeepJobFlowAliveWhenNoSteps: false
    ,TerminationProtected: false
    ,HadoopVersion: 1.0.3}
  ,Steps: [
    {StepConfig: {Name: Enable debugging
      ,ActionOnFailure: TERMINATE_JOB_FLOW
      ,HadoopJarStep: {Properties: []
        ,Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar
        ,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}
    }
    ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:36:12 CEST 2013
      ,EndDateTime: Tue Aug 06 16:36:40 CEST 2013
      ,}
    }
  , {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
      ,ActionOnFailure: TERMINATE_JOB_FLOW
      ,HadoopJarStep: {Properties: []
        ,Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
        ,Args: [s3n://map-reduce-intro/input/input.txt
          , s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}
    }
    ,ExecutionStatusDetail: {State: COMPLETED
      ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:36:40 CEST 2013
      ,EndDateTime: Tue Aug 06 16:38:10 CEST 2013
      ,}
    }]
  ,BootstrapActions: []
  ,SupportedProducts: []
  ,VisibleToAllUsers: false
,}
Process finished with exit code 0

And of course, we have a result in the ‘result’ folder that we configured in our S3 bucket:
Screen Shot 2013-08-06 at 19.39.15
I transfer the result to my local machine and have a look at it:
Screen Shot 2013-08-06 at 19.41.44

So that concludes this simple, but I think, rather complete example of creating a Hadoop job and running it on a cluster after having it unit tested, as we would do with all our software.

With this setup as a base, it is quite easy to come up with more complex business cases and have these tested and configured to be run on AWS EMR.

Hortonworks Sandbox is a personal, portable Apache Hadoop® environment that comes with dozens of interactive Hadoop and it's ecosystem tutorials and the most exciting developments from the latest HDP distribution, brought to you in partnership with Hortonworks.

Topics:

Published at DZone with permission of Pascal Alma, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}