Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Word Count With Storm and Scala

DZone's Guide to

Word Count With Storm and Scala

Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Apache Storm is a free and open-source distributed realtime computation system running on the JVM.

To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.

Storm works with spouts and bolts.

First, we shall implement a spout that will emit fake data events — in our case, sentences.

 package com.gkatzioura.scala.storm
 import org.apache.storm.spout.SpoutOutputCollector
 import org.apache.storm.task.TopologyContext
 import org.apache.storm.topology.OutputFieldsDeclarer
 import org.apache.storm.topology.base.BaseRichSpout
 import org.apache.storm.tuple. {
  Fields,
  Values
 }
 import org.apache.storm.utils.Utils
 import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
  var _collector: SpoutOutputCollector = _
  var _rand: Random = _ override def nextTuple(): Unit = {
   Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
  }
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
   _collector = collector _rand = Random
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is to implement a bolt that splits the sentences and emits them:

 package com.gkatzioura.scala.storm
 import java.text.BreakIterator
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
  Fields,
  Tuple,
  Values
 } /** * Created by gkatzioura on 2/18/17. */
 class SplitSentenceBolt extends BaseBasicBolt {
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
   val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
   var end: Int = start
   while (end != BreakIterator.DONE) {
    end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
    if (!word.equals("")) {
     collector.emit(new Values(word))
    }
   }
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is the word count bolt:

 package com.gkatzioura.scala.storm
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
   Fields,
   Tuple,
   Values
  }
  /** * Created by gkatzioura on 2/18/17. */
 class WordCountBolt extends BaseBasicBolt {
  val counts = scala.collection.mutable.Map[String, Int]()
  override def execute(input: Tuple, collector: BasicOutputCollector):
   Unit = {
    val word = input.getString(0)
    val optCount = counts.get(word)
    if (optCount.isEmpty) {
     counts.put(word, 1)
    } else {
     counts.put(word, optCount.get + 1)
    }
    collector.emit(new Values(word, counts))
   }
  override def declareOutputFields(declarer: OutputFieldsDeclarer):
   Unit = {
    declarer.declare(new Fields("word", "count"));
   }
 }


The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:

package com.gkatzioura.scala.storm
import org.apache.storm. {
 Config,
 LocalCluster,
 StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 /** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
 def main(args: Array[String]): Unit = {
  println("Hello, world!")
  val builder = new TopologyBuilder
  builder.setSpout("spout", new RandomSentenceSpout, 5)
  builder.setBolt("split",
   new SplitSentenceBolt, 8).shuffleGrouping("spout")
  builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
   new Fields("word"))
  val conf = new Config()
  conf.setDebug(true)
  if (args != null && args.length > 0) {
   conf.setNumWorkers(3)
   StormSubmitter.submitTopology(args(0),
    conf, builder.createTopology())
  } else {
   conf.setMaxTaskParallelism(3)
   val cluster = new LocalCluster
   cluster.submitTopology("word-count", conf, builder.createTopology())
   Thread.sleep(10000) cluster.shutdown()
  }
 }
}

Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

Our SBT file is as follows:

 name := "ScalaStorm" version := "1.0" 
 scalaVersion := "2.12.1" 
 scalacOptions += "-Yresolve-term-conflict:package" 
 libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" 

Then, we issue a build:

 sbt clean compile assembly 


You can find the source code on GitHub.

On the next post, we shall deploy our Storm app to HDInsight.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
storm ,scala ,big data ,hadoop ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}