DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Building Scalable Real-Time Apps with AstraDB and Vaadin
Register Now

Trending

  • Playwright JavaScript Tutorial: A Complete Guide
  • How To Manage Vulnerabilities in Modern Cloud-Native Applications
  • How Web3 Is Driving Social and Financial Empowerment
  • Send Email Using Spring Boot (SMTP Integration)

Trending

  • Playwright JavaScript Tutorial: A Complete Guide
  • How To Manage Vulnerabilities in Modern Cloud-Native Applications
  • How Web3 Is Driving Social and Financial Empowerment
  • Send Email Using Spring Boot (SMTP Integration)
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Run Your Hadoop MapReduce Job on Amazon EMR

Run Your Hadoop MapReduce Job on Amazon EMR

$$anonymous$$ user avatar by
$$anonymous$$
·
Sep. 13, 13 · Interview
Like (0)
Save
Tweet
Share
7.94K Views

Join the DZone community and get the full member experience.

Join For Free

a while ago i posted about how to setup an amazon elastic mapreduce cluster by using the cli. in this post i will show you how to setup the cluster using the java sdk for aws .
the best way to illustrate how to do this with the java aws sdk is to show a complete example, so lets start.

  • set up a new maven project
  • for this task, i created a new default maven project. the main class in this project is the one that you can run to initiate the emr cluster and perform the mapreduce job i created in this post :


package net.pascalalma.aws.emr;
 
import com.amazonaws.amazonserviceexception;
import com.amazonaws.auth.awscredentials;
import com.amazonaws.auth.propertiescredentials;
import com.amazonaws.regions.region;
import com.amazonaws.regions.regions;
import com.amazonaws.services.ec2.model.instancetype;
import com.amazonaws.services.elasticmapreduce.amazonelasticmapreduceclient;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.amazonaws.services.elasticmapreduce.util.stepfactory;
import com.amazonaws.services.s3.amazons3;
import com.amazonaws.services.s3.amazons3client;
 
import java.util.arrays;
import java.util.date;
import java.util.list;
import java.util.uuid;
 
/**
 * created with intellij idea.
 * user: pascal
 * date: 22-07-13
 * time: 20:45
 */
public class myclient {
 
    private static final string hadoop_version = "1.0.3";
    private static final int instance_count = 1;
    private static final string instance_type = instancetype.m1small.tostring();
    private static final uuid random_uuid = uuid.randomuuid();
    private static final string flow_name = "dictionary-" + random_uuid.tostring();
    private static final string bucket_name = "map-reduce-intro";
    private static final string s3n_hadoop_jar =
            "s3n://" + bucket_name + "/job/mapreduce-1.0-snapshot.jar";
    private static final string s3n_log_uri = "s3n://" + bucket_name + "/log/";
    private static final string[] job_args =
            new string[]{"s3n://" + bucket_name + "/input/input.txt",
                    "s3n://" + bucket_name + "/result/" + flow_name};
    private static final list<string> args_as_list = arrays.aslist(job_args);
    private static final list<jobflowexecutionstate> done_states = arrays
            .aslist(new jobflowexecutionstate[]{jobflowexecutionstate.completed,
                    jobflowexecutionstate.failed,
                    jobflowexecutionstate.terminated});
    static amazons3 s3;
    static amazonelasticmapreduceclient emr;
 
    private static void init() throws exception {
        awscredentials credentials = new propertiescredentials(
                myclient.class.getclassloader().getresourceasstream("awscredentials.properties"));
        s3 = new amazons3client(credentials);
        emr = new amazonelasticmapreduceclient(credentials);
        emr.setregion(region.getregion(regions.eu_west_1));
    }
 
    private static jobflowinstancesconfig configinstance() throws exception {
 
        // configure instances to use
        jobflowinstancesconfig instance = new jobflowinstancesconfig();
        instance.sethadoopversion(hadoop_version);
        instance.setinstancecount(instance_count);
        instance.setmasterinstancetype(instance_type);
        instance.setslaveinstancetype(instance_type);
        // instance.setkeepjobflowalivewhennosteps(true);
        // instance.setec2keyname("4synergy_palma");
 
        return instance;
    }
 
    private static void runcluster() throws exception {
        // configure the job flow
        runjobflowrequest request = new runjobflowrequest(flow_name, configinstance());
        request.setloguri(s3n_log_uri);
 
        // configure the hadoop jar to use
        hadoopjarstepconfig jarconfig = new hadoopjarstepconfig(s3n_hadoop_jar);
        jarconfig.setargs(args_as_list);
 
        try {
 
            stepconfig enabledebugging = new stepconfig()
                    .withname("enable debugging")
                    .withactiononfailure("terminate_job_flow")
                    .withhadoopjarstep(new stepfactory().newenabledebuggingstep());
 
            stepconfig runjar =
                    new stepconfig(s3n_hadoop_jar.substring(s3n_hadoop_jar.indexof('/') + 1),
                            jarconfig);
 
            request.setsteps(arrays.aslist(new stepconfig[]{enabledebugging, runjar}));
 
            //run the job flow
            runjobflowresult result = emr.runjobflow(request);
 
            //check the status of the running job
            string laststate = "";
 
            status_loop:
            while (true) {
                describejobflowsrequest desc =
                        new describejobflowsrequest(
                                arrays.aslist(new string[]{result.getjobflowid()}));
                describejobflowsresult descresult = emr.describejobflows(desc);
                for (jobflowdetail detail : descresult.getjobflows()) {
                    string state = detail.getexecutionstatusdetail().getstate();
                    if (isdone(state)) {
                        system.out.println("job " + state + ": " + detail.tostring());
                        break status_loop;
                    } else if (!laststate.equals(state)) {
                        laststate = state;
                        system.out.println("job " + state + " at " + new date().tostring());
                    }
                }
                thread.sleep(10000);
            }
        } catch (amazonserviceexception ase) {
            system.out.println("caught exception: " + ase.getmessage());
            system.out.println("reponse status code: " + ase.getstatuscode());
            system.out.println("error code: " + ase.geterrorcode());
            system.out.println("request id: " + ase.getrequestid());
        }
    }
 
    public static boolean isdone(string value) {
        jobflowexecutionstate state = jobflowexecutionstate.fromvalue(value);
        return done_states.contains(state);
    }
 
    public static void main(string[] args) {
        try {
            init();
            runcluster();
        } catch (exception e) {
            e.printstacktrace(); 
        }
    }
}

in this class, i declare some constants first. i assume these will be obvious. in the init() method, i use the credential properties file that i added to the project. i added this file to the ‘ /main/resources ’ folder of my maven project. it contains my access key and secret key.
also, i set the region to ‘ eu-west ’ for the emr client.

the next method is ‘ configinstance() ’. in this method, i create and configure the jobflowinstance by setting the hadoop version, number of instances, size of instances, etc. also, you can configure the ‘ keepalive ’ setting to keep the cluster alive after the jobs have finished. this could be helpful in some cases. if you want to use this option it might also be useful to set the key-pair you want to use in order to access the cluster, because i wasn’t able to access the cluster without setting this key.

the method ‘ runcluster() ’ is were the cluster actually runs. it creates the request to initiate the cluster. in this request, the steps are added that have to be executed. in our case, one of the steps is running the jar file we created in the previous steps. i also added a debug step so we have access to the debug logging after the cluster is finished and terminated. we can simply access the log files in the s3 bucket that i set with the constant ‘ s3n_log_uri ’.

when this request is created, we start the cluster based on this request. then we poll every ten seconds to see whether the job has finished and show a message in the console indicating the current state of the job.

to execute the first run, we have to prepare the input.

  • prepare the input
  • as input for the job (see this for more info about this example job), we have to make the dictionary contents available for the emr cluster. furthermore, we have to make the jar file available, and make sure the output and log directory exists in our s3 buckets. there are several ways to do this: you can do it programmatically by using the sdk, from the command line using using s3cmd or by using the aws management console . you should end up with a similar setup to this:

    • s3://map-reduce-intro
    • s3://map-reduce-intro/input
    • s3://map-reduce-intro/input/input.txt
    • s3://map-reduce-intro/job
    • s3://map-reduce-intro/job/mapreduce-1.0-snapshot.jar
    • s3://map-reduce-intro/log
    • s3://map-reduce-intro/result

    when using s3cmd, it looks like this:


s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/
2013-07-20 13:06    469941   s3://map-reduce-intro/input/input.txt
2013-07-20 14:12      5491   s3://map-reduce-intro/job/mapreduce-1.0-snapshot.jar
2013-08-06 14:30         0   s3://map-reduce-intro/log/
2013-08-06 14:27         0   s3://map-reduce-intro/result/

in the example above, i already introduced an s3 client in the code. you can also use that to prepare the input or get the output as part of the client’s job.

  • run the cluster
  • when everything is in place, we can run the job. i simply run the main method of ‘myclient’ in intellij and get the following output in my console:


job starting at tue aug 06 16:31:55 cest 2013
job running at tue aug 06 16:36:18 cest 2013
job shutting_down at tue aug 06 16:38:40 cest 2013
job completed: {
  jobflowid: j-jdb14hvtrc1l
  ,name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43
  ,loguri: s3n://map-reduce-intro/log/,amiversion: 2.4.0
  ,executionstatusdetail: {state: completed,creationdatetime: tue aug 06 16:31:58 cest 2013
    ,startdatetime: tue aug 06 16:36:14 cest 2013
    ,readydatetime: tue aug 06 16:36:14 cest 2013
    ,enddatetime: tue aug 06 16:39:02 cest 2013
    ,laststatechangereason: steps completed}
  ,instances: {masterinstancetype: m1.small
    ,masterpublicdnsname: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com
    ,masterinstanceid: i-93268ddf
    ,instancecount: 1
    ,instancegroups: [{instancegroupid: ig-2lurhnak5nvkz
      ,name: master
      ,market: on_demand
      ,instancerole: master
      ,instancetype: m1.small
      ,instancerequestcount: 1
      ,instancerunningcount: 0
      ,state: ended
      ,laststatechangereason: job flow terminated
      ,creationdatetime: tue aug 06 16:31:58 cest 2013
      ,startdatetime: tue aug 06 16:34:28 cest 2013
      ,readydatetime: tue aug 06 16:36:10 cest 2013
      ,enddatetime: tue aug 06 16:39:02 cest 2013}]
    ,normalizedinstancehours: 1
    ,ec2keyname: 4synergy_palma
    ,placement: {availabilityzone: eu-west-1a}
    ,keepjobflowalivewhennosteps: false
    ,terminationprotected: false
    ,hadoopversion: 1.0.3}
  ,steps: [
    {stepconfig: {name: enable debugging
      ,actiononfailure: terminate_job_flow
      ,hadoopjarstep: {properties: []
        ,jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar
        ,args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}
    }
    ,executionstatusdetail: {state: completed,creationdatetime: tue aug 06 16:31:58 cest 2013
      ,startdatetime: tue aug 06 16:36:12 cest 2013
      ,enddatetime: tue aug 06 16:36:40 cest 2013
      ,}
    }
  , {stepconfig: {name: /map-reduce-intro/job/mapreduce-1.0-snapshot.jar
      ,actiononfailure: terminate_job_flow
      ,hadoopjarstep: {properties: []
        ,jar: s3n://map-reduce-intro/job/mapreduce-1.0-snapshot.jar
        ,args: [s3n://map-reduce-intro/input/input.txt
          , s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}
    }
    ,executionstatusdetail: {state: completed
      ,creationdatetime: tue aug 06 16:31:58 cest 2013
      ,startdatetime: tue aug 06 16:36:40 cest 2013
      ,enddatetime: tue aug 06 16:38:10 cest 2013
      ,}
    }]
  ,bootstrapactions: []
  ,supportedproducts: []
  ,visibletoallusers: false
,}
process finished with exit code 0

and of course, we have a result in the ‘ result ’ folder that we configured in our s3 bucket:
screen shot 2013-08-06 at 19.39.15
i transfer the result to my local machine and have a look at it:
screen shot 2013-08-06 at 19.41.44

so that concludes this simple, but i think, rather complete example of creating a hadoop job and running it on a cluster after having it unit tested, as we would do with all our software.

with this setup as a base, it is quite easy to come up with more complex business cases and have these tested and configured to be run on aws emr .

career hadoop cluster MapReduce

Published at DZone with permission of $$anonymous$$. See the original article here.

Opinions expressed by DZone contributors are their own.

Trending

  • Playwright JavaScript Tutorial: A Complete Guide
  • How To Manage Vulnerabilities in Modern Cloud-Native Applications
  • How Web3 Is Driving Social and Financial Empowerment
  • Send Email Using Spring Boot (SMTP Integration)

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: