WordCount on Hadoop With Scala
We use Scala and Java to implement a simple map reduce job and then run it using HDInsight using WordCount as an example.
Join the DZone community and get the full member experience.
Join For FreeToday, we will use Scala to implement a simple map reduce job and then run it using HDInsight.
We shall add the assembly plugin on our assembly.sbt
:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Then we will add the Hadoop core dependency on our build.sbt file.
Also will we apply some configuration in the merge strategy to avoid deduplicate
errors. assembly
MergeStrategy in
assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first }
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
We will use WordCount as an example.
The original Java class shall be transformed to a Scala class:
package com.gkatzioura.scala
import java.lang.Iterable
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._
/**
* Created by gkatzioura on 2/14/17.
*/
package object WordCount {
class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
val one = new IntWritable(1)
val word = new Text()
override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
val itr = new StringTokenizer(value.toString)
while (itr.hasMoreTokens()) {
word.set(itr.nextToken())
context.write(word, one)
}
}
}
class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
var sum = values.asScala.foldLeft(0)(_ + _.get)
context.write(key, new IntWritable(sum))
}
}
def main(args: Array[String]): Unit = {
val configuration = new Configuration
val job = Job.getInstance(configuration,"word count")
job.setJarByClass(this.getClass)
job.setMapperClass(classOf[TokenizerMapper])
job.setCombinerClass(classOf[IntSumReader])
job.setReducerClass(classOf[IntSumReader])
job.setOutputKeyClass(classOf[Text])
job.setOutputKeyClass(classOf[Text]);
job.setOutputValueClass(classOf[IntWritable]);
FileInputFormat.addInputPath(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
System.exit(if(job.waitForCompletion(true)) 0 else 1)
}
}
Then, we will build our example:
sbt clean compile assembly
Our new JAR will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar
.
In the next post, we shall run our code using Azure’s HDInsight.
You can find the code on GitHub.
Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments