Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Word Count With Storm and Scala

DZone 's Guide to

Word Count With Storm and Scala

Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.

· Big Data Zone ·
Free Resource

Apache Storm is a free and open-source distributed realtime computation system running on the JVM.

To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.

Storm works with spouts and bolts.

First, we shall implement a spout that will emit fake data events — in our case, sentences.

 package com.gkatzioura.scala.storm
 import org.apache.storm.spout.SpoutOutputCollector
 import org.apache.storm.task.TopologyContext
 import org.apache.storm.topology.OutputFieldsDeclarer
 import org.apache.storm.topology.base.BaseRichSpout
 import org.apache.storm.tuple. {
  Fields,
  Values
 }
 import org.apache.storm.utils.Utils
 import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
  var _collector: SpoutOutputCollector = _
  var _rand: Random = _ override def nextTuple(): Unit = {
   Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
  }
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
   _collector = collector _rand = Random
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is to implement a bolt that splits the sentences and emits them:

 package com.gkatzioura.scala.storm
 import java.text.BreakIterator
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
  Fields,
  Tuple,
  Values
 } /** * Created by gkatzioura on 2/18/17. */
 class SplitSentenceBolt extends BaseBasicBolt {
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
   val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
   var end: Int = start
   while (end != BreakIterator.DONE) {
    end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
    if (!word.equals("")) {
     collector.emit(new Values(word))
    }
   }
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is the word count bolt:

 package com.gkatzioura.scala.storm
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
   Fields,
   Tuple,
   Values
  }
  /** * Created by gkatzioura on 2/18/17. */
 class WordCountBolt extends BaseBasicBolt {
  val counts = scala.collection.mutable.Map[String, Int]()
  override def execute(input: Tuple, collector: BasicOutputCollector):
   Unit = {
    val word = input.getString(0)
    val optCount = counts.get(word)
    if (optCount.isEmpty) {
     counts.put(word, 1)
    } else {
     counts.put(word, optCount.get + 1)
    }
    collector.emit(new Values(word, counts))
   }
  override def declareOutputFields(declarer: OutputFieldsDeclarer):
   Unit = {
    declarer.declare(new Fields("word", "count"));
   }
 }


The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:

package com.gkatzioura.scala.storm
import org.apache.storm. {
 Config,
 LocalCluster,
 StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 /** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
 def main(args: Array[String]): Unit = {
  println("Hello, world!")
  val builder = new TopologyBuilder
  builder.setSpout("spout", new RandomSentenceSpout, 5)
  builder.setBolt("split",
   new SplitSentenceBolt, 8).shuffleGrouping("spout")
  builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
   new Fields("word"))
  val conf = new Config()
  conf.setDebug(true)
  if (args != null && args.length > 0) {
   conf.setNumWorkers(3)
   StormSubmitter.submitTopology(args(0),
    conf, builder.createTopology())
  } else {
   conf.setMaxTaskParallelism(3)
   val cluster = new LocalCluster
   cluster.submitTopology("word-count", conf, builder.createTopology())
   Thread.sleep(10000) cluster.shutdown()
  }
 }
}

Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

Our SBT file is as follows:

 name := "ScalaStorm" version := "1.0" 
 scalaVersion := "2.12.1" 
 scalacOptions += "-Yresolve-term-conflict:package" 
 libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" 

Then, we issue a build:

 sbt clean compile assembly 


You can find the source code on GitHub.

On the next post, we shall deploy our Storm app to HDInsight.

Topics:
storm ,scala ,big data ,hadoop ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}