Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Hadoop Word Count Program in Scala

DZone's Guide to

Hadoop Word Count Program in Scala

You must've seen a Hadoop word count program in Java, Python, or in C/C++ but probably not in Scala. So, let's build one...

· Java Zone
Free Resource

Build vs Buy a Data Quality Solution: Which is Best for You? Gain insights on a hybrid approach. Download white paper now!

You must have seen a Hadoop word count program in Java, Python, or in C/C++ before, but probably not in Scala. So, let's learn how to build a word count program in Scala.

Submitting a job that is written in Scala to Hadoop is not that easy because Hadoop runs on Java so it doesn't understand the functional aspect of Scala.

For writing a word count program in Scala, we need to follow the following steps:

  • Create a Scala Project with the SBT having a version of your choice.
  • Add Hadoop core dependency in build.sbt from here.
  • Create Scala object—WordCount with the main method in the project.
  • Create a class under the Scala object—Map that extends MapReduceBase class with Mapper class.
  • Provide body to Map Function.
  • Create another class under Scala object say Reduce that extends MapReduceBase class with Reduce class.
  • Provide body to reduce function.
  • Provide necessary job configuration in the main method of the Scala object.

Here is an example of a word count program written in Scala:

import java.io.IOException
import java.util._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._
import org.apache.hadoop.util._

object WordCount {

  class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
    private final val one = new IntWritable(1)
    private val word = new Text()

    @throws[IOException]
    def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) {
      val line: String = value.toString
      line.split(" ").foreach {token =>
        word.set(token)
        output.collect(word, one)
      }
    }
  }

  class Reduce extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
    @throws[IOException]
    def reduce(key: Text, values: Iterator[IntWritable], output: OutputCollector[Text, IntWritable], reporter: Reporter) {
      import scala.collection.JavaConversions._
      val sum = values.toList.reduce((valueOne, valueTwo) => new IntWritable(valueOne.get() + valueTwo.get()))
      output.collect(key,  new IntWritable(sum.get()))
    }
  }

  @throws[Exception]
  def main(args: Array[String]) {
    val conf: JobConf = new JobConf(this.getClass)
    conf.setJobName("WordCountScala")
    conf.setOutputKeyClass(classOf[Text])
    conf.setOutputValueClass(classOf[IntWritable])
    conf.setMapperClass(classOf[Map])
    conf.setCombinerClass(classOf[Reduce])
    conf.setReducerClass(classOf[Reduce])
    conf.setInputFormat(classOf[TextInputFormat])
    conf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])
    FileInputFormat.setInputPaths(conf, new Path(args(0)))
    FileOutputFormat.setOutputPath(conf, new Path(args(1)))
    JobClient.runJob(conf)
  }
}

So far, we've created our program in Scala, now we need to submit this program/job to Hadoop. For submitting a job to Hadoop, we need to follow certain steps:

  • Add sbt-assembly plugin to plugin.sbt under project from here.
  • Open terminal and change directory to the root of the project.
  • In terminal, run the command  sbt clean compile assembly 
  • This command will build the jar under the target/scala<version> folder of the project.
  • Create directory in HDFS with the following command: 
    $HADOOP_HOME/bin/hadoop fs -mkdir input_dir 
  • Insert some data in the newly created directory in HDFS by using the following command:
    $HADOOP_HOME/bin/hadoop fs -put sample.txt input_dir 
  • Now submit a job to Hadoop with the following command:
    $HADOOP_HOME/bin/hadoop jar jar_name.jar input_dir output_dir 

The jar in the last command is that same as the one stored in the target/scala<version> directory of the project.

You can see the output by using the following command:

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

If you encounter any problems in regards to building the jar using the sbt clean compile assembly command, then you need to include mergeStrategy in build.sbt. You can find related information here.

Resources 

  • You can find hadoop-core dependency here.
  • You can find sbt-assembly Plugin here.
  • You can find the this Project here.

Build vs Buy a Data Quality Solution: Which is Best for You? Maintaining high quality data is essential for operational efficiency, meaningful analytics and good long-term customer relationships. But, when dealing with multiple sources of data, data quality becomes complex, so you need to know when you should build a custom data quality tools effort over canned solutions. Download our whitepaper for more insights into a hybrid approach.

Topics:
functional ,hadoop word count ,scala ,hadoop ,compile ,aspect ,assembly

Published at DZone with permission of Akash Sethi, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}