Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Introduction to Hadoop Map Reduce

DZone's Guide to

Introduction to Hadoop Map Reduce

A nice long introduction on how to code Hadoop Map Reduce code in Java with an example related to temperature.

Free Resource

Learn best practices according to DataOps. Download the free O'Reilly eBook on building a modern Big Data platform.

In this article, we will be reading about Hadoop Map Reduce. As we all know, to perform faster processing, we need to process the data in parallel. That's what Hadoop Map Reduce provides us.

MapReduce: MapReduce is a programming model for data processing. MapReduce programs are inherently parallel, thus putting very large-scale data analysis into the hands of anyone with enough machines at their disposal.MapReduce works by breaking the processing into two phases: the map phase and the reduce phase.

Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions the map function and the reduce function.

The Map Function: The key is the offset of the beginning of the line from the beginning of the file. Map function setting up the data in such a way that the reduce function can do its work on it. The map function is also a good place to drop bad records. So, generally, we filter out the necessary data that we need to process. To provide the body of Map Function we need to extend Mapper class. To understand Map Function better, let's look at an example where we are considering NCDC raw data:

We need to find the maximum temperature for each year.

0067011990999991950051507004…9999999N9+0000+99999999999…
0043011990999991950051512004…9999999N9+0022+199999999999…
0043011990999991950051518004…9999999N9-0011+99999999999…
0043012650999991949032412004…0500001N9+0111+99999999999…
0043012650999991949032418004…0500001N9+0078+99999999999…

So, the input for out map function is something like this.

(0, 0067011990999991950051507004…9999999N9+000099999999999…)
(106, 0043011990999991950051512004…9999999N9+0022+99999999999…)
(212, 0043011990999991950051518004…9999999N9-0011+99999999999…)
(318, 0043012650999991949032412004…0500001N9+0111+99999999999…)
(424, 0043012650999991949032418004…0500001N9+0078+99999999999…)

The keys are the line offsets within the file, which we ignore in our map function. The

map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output (the temperature values have been interpreted as integers):

(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

Code for Map Function:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') {
          // parseInt doesn't like leading plus signs
        airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
        airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("01459")) {
 context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

The Mapper class is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). The map() method also provides an instance of Context to write the output to. In this case, we write the year as a Text object.

The Reduce Function: In Reduce Function we perform actual computation. The operation that we need to perform totally depends on the user’s user case. The Input for the Reduce function is like this (Key, List( values) ).

Example Continue: Here, input for the Reduce Function is something like this:

(1949, [111, 78]) 
(1950, [0, 22, −11])

Now, we need to find maximum temperature for each year.

Code for Reduce Function:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
        extends Reducer&amp;amp;lt;Text, IntWritable, Text, IntWritable&amp;amp;gt; {
    @Override
    public void reduce(Text key, Iterable&amp;amp;lt;IntWritable&amp;amp;gt; values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

Again, four formal type parameters are used to specify the input and output types, this
time for the reduce function. The input types of the reduce function must match the 
output types of the map function.

The output of our program is something like this:

(1949, 111) 
(1950, 22)

The whole process of Hadoop map reduce can be seen in the following diagram.

mapreduce





import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature<input path> <output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern.

The output path (of which there is only one) is specified by the static setOutput 
Path() method on FileOutputFormat. It specifies a directory where the output files
 from the reduce function are written. The directory shouldn’t exist before running the
 job because Hadoop will complain and not run the job. This precaution is to prevent
 data loss.

Next, we specify the map and reduce types to use via the setMapperClass() and
setReducerClass() methods. The setOutputKeyClass() and setOutputValueClass() methods control the output types for the reduce function, and must match what the Reduce class produces.

REFERENCE: Hadoop The Definitive Guide 4th Edition.

Find the perfect platform for a scalable self-service model to manage Big Data workloads in the Cloud. Download the free O'Reilly eBook to learn more.

Topics:
hadoop ,map reduce ,big data ,java ,jvm

Published at DZone with permission of Anuranga Samaradiwakara. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}