{{announcement.body}}
{{announcement.title}}

Apache Hadoop: How MapReduce Can Essentiate Data From HDFS

DZone 's Guide to

Apache Hadoop: How MapReduce Can Essentiate Data From HDFS

In this article, we discuss some basic concepts behind MapReduce and discuss how it can be used to essentiate data from HDFS.

· Big Data Zone ·
Free Resource

files-in-wood-cases

Data generation is becoming an important process. As the world economic forum states, a total of 463 exabytes (10,006 bytes) of data will be generated each day. Collecting data from various social media channels, sensors, search generated data, etc. are required to be stored in a reliable infrastructure where the security of data and easier availability for processing is guaranteed.

This generated data is further required to be processed to obtain outcomes that can increase the accuracy of predictions and calculations to build smarter systems. Hence, the authenticity of data must be preserved. At the same time, it must be stored in a file system that supports faster operation executions. Apache Hadoop guarantees faster and easier management while storing large amounts of data through distributed file systems.

You may also like: Getting Started With Apache Hadoop.

Apache Hadoop and Hadoop Distributed File System (HDFS)

Apache Hadoop is an open source framework that helps to solve the problem of distributed computing and storing data by supporting software development through its software library. Hence, Hadoop Distributed File System (HDFS) is used to store data across different devices, which can be fetched and shared easily and efficiently. Hadoop is a Java-based framework and is easier to operate.

Why Use Hadoop Distributed File System (HDFS)?

The data used for big data analytics is not structured. Structured data is in a single form and can be stored over the file system in use. It is easier to process the structured data, as it is in a single form only and hence, the processing is faster.

However, big data includes data in the form of images, audio signals, videos, sensor data, etc. To process this unstructured data, it needs to be stored over a large distributed file system where the fetching of data is also easier and processing too.

Hence, HDFS is used, as it provides the best option to process the data and storing it across devices from where it can be fetched only after authentication requirements are fulfilled. Hadoop offers a command line interface to interact with data and complete processing.

Hadoop MapReduce: A Software Processing Distributed Data

Hadoop MapReduce is a framework efficiently helping in processing the distributed data. It is the original Hadoop engine that primarily uses Java to process the data. MapReduce is simply an algorithm working on the principle of parallel processing.

Since Google came in demand for processing searches per minute it receives (apparently, it is 3.8 million!), it introduced parallel processing in its GFS (Google File System), where the given task is divided into small chunks of tasks and they are completed in such a way that increases the efficiency and speed of processing. MapReduce is an algorithm working on parallel processing, and it follows master-slave architecture similar to HDFS to implement it.

How MapReduce Works

Parallel processing breaks up data into smaller chunks and processes each chunk parallelly to gather and form a final output. As HDFS has already divided the stored files in bite-sized chunks, all that's left for MapReduce to do is process the data.

Processing consists of complex steps, but some of them are already performed by Hadoop when storing data so MapReduce starts with the following procedure. In this article, we're going to understand the procedure using a simple example:

Consider that our data has been distributed in two nodes, and the content of each file are:

  • File1: Good Morning Tom Morning Jerry.

  • File2: Hello Tom Hello Jerry.

Step 1: Mapping

Mapper is supposed to have independent logic that can help map data and divide it into chunks according to the algorithm. The input is set in the driver program of a particular file to be processed. The input format on which the process will be run is also set there. The output generated will be in the form of <key, value> pairs, which will not be saved in HDFS; instead, they'll be in an intermediate file created on the operating system to execute further processing.

Following the example, the mapper will generate output as:

File1
<Good, 1>
<Morning, 1>
<Tom, 1>
<Morning, 1>
<Jerry, 1>

File2
<Hello, 1>
<Tom, 1>
<Hello, 1>
<Jerry, 1>


Step 2: Shuffle and Combine (Sorting)

This step can be overridden if not required to perform. Shuffle is an intermediate step between mapping and reducing that helps gather and assemble data accordingly to make processing easier. The output format after a shuffle and combine process will be <key, List(list of value)> that indicates the process of grouping the mapper key values and outputs.

For the given example, this stage will generate output as:

File1
<Good, 1>
<Morning, 2>
<Tom, 1>
<Jerry, 1>

File2
<Hello, 2>
<Tom, 1>
<Jerry, 1>


Step 3: Reduce

Reducer finally merges the outputs received from the shuffle and combine procedure and generates the final output. It processes on the provided <key, List(list of values)> and generates the final output in the <key, List(list of values)> format, aggregating the data generated from different chunks. The final output from the reducer gets stored in HDFS, and it is the final output after MapReduce execution.

The final output for the stated example will be:

<Good, 1>
<Morning, 2>
<Hello, 2>
<Tom, 2>
<Jerry, 2>


MapReduce Process Sample and Output

MapReduce Process Sample and Output

The entire process of MapReduce is illustrated here:

MapReduce Process (Source)

MapReduce Process

Understanding MapReduce Practically: MapReduce Applications

The above-explained algorithm is required to be put in practical use using the programming languages that support Hadoop MapReduce jobs. MapReduce can be implemented using C++, Java, Python, or Ruby. However, the primary programming language for implementing MapReduce on HDFS is Java.

To use MapReduce on big data stored in HDFS, the inputs to MapReduce should be provided as:

Input/Output Locations

Abstract Classes and/or map and reduce functions using the essential interfaces.

Serialization of Inputs

Also, these functions should work with serialization key. For that, two interfaces are provided by Hadoop:

1. Writable Interface

The writable interface converts the input data stream into an object and then serializes it as output. Some of the classes using this interface are ArrayWritable and ByteWritable. To create a custom class using the Writable interface, the following two methods are required to be used:

 void readFields(DataInput in): Converts input stream into an object.
 void write(DataOutput out): Serializes the output object.

2. WritableComparable Interface

This interface is used to compare the key values of objects achieved through the Writable interface. To create a custom class of WritableComparable interface, it is required to implement the WritableComparable class and the following methods:

 void readFields (DataInput in): Converts input stream into an object.
 void write (DataOutput out): Serializes the output object.
 Int compareTo (Object obj): Compares the value of objects to sort the key.

Below is a simple example of implementing MapReduce job over the set of words. WordCount here counts the number of occurrences of each word given in the set.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


Daemon Processes

To efficiently run the MapReduce job, the processing is done through daemon processes that execute without taking up much memory space and processing power to produce efficient outputs. As MapReduce also follows the master-slave architecture, the daemon processes are of two types: JobTracker, the master process, and TaskTracker, the slave process.

JobTracker

This is the master process that manages the resources, identifies TaskTrackers that are performing tasks, and follows the process cycle. If a JobTracker process fails, the entire MapReduce job fails. The primary functions assigned to JobTracker are checking resource availability, task process cycle, and keeping updated on the status of TaskTracker to know the process status.

TaskTracker

 TaskTracker is the daemon process completing all tasks assigned by JobTrackerTaskTracker  keeps sending heartbeat messages throughout the time to JobTracker to keep it updated with the process status.

MapReduce Architecture (Source)

MapReduce Architecture

However, for the data that doesn't require reduce operation executed at the end of processing, Hadoop also facilitates "Map-Only Jobs" that help to map data.

Map-Only Jobs

Whenever the processing only requires repetitive processing of mapping on the data, the reduce operation can be eliminated and output will be produced as the equal value mapping has provided. This can be enabled using  Job.SetNumReduceTasks(0).

Also, it is important to remember that Map-only jobs are not stored in the database. A separate file is created in the OS to store the output that can be later transferred to the reducer. Map-only jobs are faster to execute than MapReduce. The code below illustrates how mapper-only can be enabled for an assigned job using Java.

package org.dedunumax.mapperOnly;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class App 
{
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
    {
    Configuration conf = new Configuration();
Job job = new Job(conf, "Mapper_Only_Job");

job.setJarByClass(App.class);
job.setMapperClass(Map.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// Sets reducer tasks to 0
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
    }
}


(Source: Github)

Are There Other Options for Parallel Processing of Big Data?

The continuous inventions and evolutions in technology terms have gifted many other options for completing tasks. MapReduce is the most basic parallel processing implementing algorithm that gets the work done. MapReduce is still one of the best options to process big data, but it faces several limitations.

In MapReduce, until the mapper completes the process, the reducer can not start working. Moreover, the order in which the mapping or reducing runs can not be controlled. Also, MapReduce can not work faster with a database having unindexed data. Following limitations, there are other options available that perform the same tasks but with a bit more efficiency and pace.

For example, Apache Spark, Hive, Pig and more. These all work with the Hadoop distributed file system only but in a more efficient way. The upcoming era is of intelligent machines and to teach the machines more and increase the accuracy, big data analytics plays a vital role and hence, the importance of MapReduce and HDFS will only increase in the upcoming era.


Further Reading

Topics:
big data, data access, hadoop, hdfs data files, mapreduce optimization, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}