Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Word Count With Storm and Scala

DZone's Guide to

Word Count With Storm and Scala

Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.

Free Resource

Learn how you can maximize big data in the cloud with Apache Hadoop. Download this eBook now. Brought to you in partnership with Hortonworks.

Apache Storm is a free and open-source distributed realtime computation system running on the JVM.

To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.

Storm works with spouts and bolts.

First, we shall implement a spout that will emit fake data events — in our case, sentences.

 package com.gkatzioura.scala.storm
 import org.apache.storm.spout.SpoutOutputCollector
 import org.apache.storm.task.TopologyContext
 import org.apache.storm.topology.OutputFieldsDeclarer
 import org.apache.storm.topology.base.BaseRichSpout
 import org.apache.storm.tuple. {
  Fields,
  Values
 }
 import org.apache.storm.utils.Utils
 import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
  var _collector: SpoutOutputCollector = _
  var _rand: Random = _ override def nextTuple(): Unit = {
   Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
  }
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
   _collector = collector _rand = Random
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is to implement a bolt that splits the sentences and emits them:

 package com.gkatzioura.scala.storm
 import java.text.BreakIterator
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
  Fields,
  Tuple,
  Values
 } /** * Created by gkatzioura on 2/18/17. */
 class SplitSentenceBolt extends BaseBasicBolt {
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
   val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
   var end: Int = start
   while (end != BreakIterator.DONE) {
    end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
    if (!word.equals("")) {
     collector.emit(new Values(word))
    }
   }
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is the word count bolt:

 package com.gkatzioura.scala.storm
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
   Fields,
   Tuple,
   Values
  }
  /** * Created by gkatzioura on 2/18/17. */
 class WordCountBolt extends BaseBasicBolt {
  val counts = scala.collection.mutable.Map[String, Int]()
  override def execute(input: Tuple, collector: BasicOutputCollector):
   Unit = {
    val word = input.getString(0)
    val optCount = counts.get(word)
    if (optCount.isEmpty) {
     counts.put(word, 1)
    } else {
     counts.put(word, optCount.get + 1)
    }
    collector.emit(new Values(word, counts))
   }
  override def declareOutputFields(declarer: OutputFieldsDeclarer):
   Unit = {
    declarer.declare(new Fields("word", "count"));
   }
 }


The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:

package com.gkatzioura.scala.storm
import org.apache.storm. {
 Config,
 LocalCluster,
 StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 /** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
 def main(args: Array[String]): Unit = {
  println("Hello, world!")
  val builder = new TopologyBuilder
  builder.setSpout("spout", new RandomSentenceSpout, 5)
  builder.setBolt("split",
   new SplitSentenceBolt, 8).shuffleGrouping("spout")
  builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
   new Fields("word"))
  val conf = new Config()
  conf.setDebug(true)
  if (args != null && args.length > 0) {
   conf.setNumWorkers(3)
   StormSubmitter.submitTopology(args(0),
    conf, builder.createTopology())
  } else {
   conf.setMaxTaskParallelism(3)
   val cluster = new LocalCluster
   cluster.submitTopology("word-count", conf, builder.createTopology())
   Thread.sleep(10000) cluster.shutdown()
  }
 }
}

Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

Our SBT file is as follows:

 name := "ScalaStorm" version := "1.0" 
 scalaVersion := "2.12.1" 
 scalacOptions += "-Yresolve-term-conflict:package" 
 libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" 

Then, we issue a build:

 sbt clean compile assembly 


You can find the source code on GitHub.

On the next post, we shall deploy our Storm app to HDInsight.

Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

Topics:
storm ,scala ,big data ,hadoop ,tutorial

Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}