Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

WordCount on Hadoop With Scala

DZone's Guide to

WordCount on Hadoop With Scala

We use Scala and Java to implement a simple map reduce job and then run it using HDInsight using WordCount as an example.

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

Today, we will use Scala to implement a simple map reduce job and then run it using HDInsight.

We shall add the assembly plugin on our assembly.sbt:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 
Then we will add the Hadoop core dependency on our build.sbt file. 
Also will we apply some configuration in the merge strategy to avoid deduplicate
errors. assembly
MergeStrategy in 
assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard 
case x => MergeStrategy.first } 
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" 

We will use WordCount as an example.

The original Java class shall be transformed to a Scala class:

package com.gkatzioura.scala

import java.lang.Iterable
import java.util.StringTokenizer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._

/**
  * Created by gkatzioura on 2/14/17.
  */
package object WordCount {

  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {

    val one = new IntWritable(1)
    val word = new Text()

    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken())
        context.write(word, one)
      }
    }
  }

  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))
    }
  }


  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    job.setJarByClass(this.getClass)
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReader])
    job.setReducerClass(classOf[IntSumReader])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable]);
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)
  }

}


Then, we will build our example:

sbt clean compile assembly 


Our new JAR will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar. 

In the next post, we shall run our code using Azure’s HDInsight.

You can find the code on GitHub.

Topics:
scala ,big data ,hadoop ,hdinsight ,wordcount ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}