DZone
Big Data Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Big Data Zone > Word Count With Storm and Scala

Word Count With Storm and Scala

Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.

Emmanouil Gkatziouras user avatar by
Emmanouil Gkatziouras
CORE ·
Feb. 27, 17 · Big Data Zone · Tutorial
Like (7)
Save
Tweet
8.75K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Storm is a free and open-source distributed realtime computation system running on the JVM.

To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.

Storm works with spouts and bolts.

First, we shall implement a spout that will emit fake data events — in our case, sentences.

 package com.gkatzioura.scala.storm
 import org.apache.storm.spout.SpoutOutputCollector
 import org.apache.storm.task.TopologyContext
 import org.apache.storm.topology.OutputFieldsDeclarer
 import org.apache.storm.topology.base.BaseRichSpout
 import org.apache.storm.tuple. {
  Fields,
  Values
 }
 import org.apache.storm.utils.Utils
 import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
  var _collector: SpoutOutputCollector = _
  var _rand: Random = _ override def nextTuple(): Unit = {
   Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
  }
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
   _collector = collector _rand = Random
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is to implement a bolt that splits the sentences and emits them:

 package com.gkatzioura.scala.storm
 import java.text.BreakIterator
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
  Fields,
  Tuple,
  Values
 } /** * Created by gkatzioura on 2/18/17. */
 class SplitSentenceBolt extends BaseBasicBolt {
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
   val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
   var end: Int = start
   while (end != BreakIterator.DONE) {
    end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
    if (!word.equals("")) {
     collector.emit(new Values(word))
    }
   }
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is the word count bolt:

 package com.gkatzioura.scala.storm
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
   Fields,
   Tuple,
   Values
  }
  /** * Created by gkatzioura on 2/18/17. */
 class WordCountBolt extends BaseBasicBolt {
  val counts = scala.collection.mutable.Map[String, Int]()
  override def execute(input: Tuple, collector: BasicOutputCollector):
   Unit = {
    val word = input.getString(0)
    val optCount = counts.get(word)
    if (optCount.isEmpty) {
     counts.put(word, 1)
    } else {
     counts.put(word, optCount.get + 1)
    }
    collector.emit(new Values(word, counts))
   }
  override def declareOutputFields(declarer: OutputFieldsDeclarer):
   Unit = {
    declarer.declare(new Fields("word", "count"));
   }
 }


The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:

package com.gkatzioura.scala.storm
import org.apache.storm. {
 Config,
 LocalCluster,
 StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 /** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
 def main(args: Array[String]): Unit = {
  println("Hello, world!")
  val builder = new TopologyBuilder
  builder.setSpout("spout", new RandomSentenceSpout, 5)
  builder.setBolt("split",
   new SplitSentenceBolt, 8).shuffleGrouping("spout")
  builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
   new Fields("word"))
  val conf = new Config()
  conf.setDebug(true)
  if (args != null && args.length > 0) {
   conf.setNumWorkers(3)
   StormSubmitter.submitTopology(args(0),
    conf, builder.createTopology())
  } else {
   conf.setMaxTaskParallelism(3)
   val cluster = new LocalCluster
   cluster.submitTopology("word-count", conf, builder.createTopology())
   Thread.sleep(10000) cluster.shutdown()
  }
 }
}

Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

Our SBT file is as follows:

 name := "ScalaStorm" version := "1.0" 
 scalaVersion := "2.12.1" 
 scalacOptions += "-Yresolve-term-conflict:package" 
 libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" 

Then, we issue a build:

 sbt clean compile assembly 


You can find the source code on GitHub.

On the next post, we shall deploy our Storm app to HDInsight.

Scala (programming language)

Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • Biometric Authentication: Best Practices
  • API Security Tools: What To Look For
  • JIT Compilation of SQL in NoSQL

Comments

Big Data Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo