DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Event-Driven Fractals
  • Easily Update and Reload SSL for a Server and an HTTP Client
  • Writing a Chat With Akka
  • Snowflake Data Processing With Snowpark DataFrames

Trending

  • Docker and Kubernetes Transforming Modern Deployment
  • What You Must Know About Rate Limiting
  • Automated Testing: The Missing Piece of Your CI/CD Puzzle
  • How to Migrate Vector Data from PostgreSQL to MyScale
  1. DZone
  2. Coding
  3. Languages
  4. Word Count With Storm and Scala

Word Count With Storm and Scala

Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.

Emmanouil Gkatziouras user avatar by
Emmanouil Gkatziouras
CORE ·
Feb. 27, 17 · Tutorial
Like (7)
Save
Tweet
Share
9.29K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Storm is a free and open-source distributed realtime computation system running on the JVM.

To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.

Storm works with spouts and bolts.

First, we shall implement a spout that will emit fake data events — in our case, sentences.

 package com.gkatzioura.scala.storm
 import org.apache.storm.spout.SpoutOutputCollector
 import org.apache.storm.task.TopologyContext
 import org.apache.storm.topology.OutputFieldsDeclarer
 import org.apache.storm.topology.base.BaseRichSpout
 import org.apache.storm.tuple. {
  Fields,
  Values
 }
 import org.apache.storm.utils.Utils
 import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
  var _collector: SpoutOutputCollector = _
  var _rand: Random = _ override def nextTuple(): Unit = {
   Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
  }
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
   _collector = collector _rand = Random
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is to implement a bolt that splits the sentences and emits them:

 package com.gkatzioura.scala.storm
 import java.text.BreakIterator
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
  Fields,
  Tuple,
  Values
 } /** * Created by gkatzioura on 2/18/17. */
 class SplitSentenceBolt extends BaseBasicBolt {
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
   val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
   var end: Int = start
   while (end != BreakIterator.DONE) {
    end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
    if (!word.equals("")) {
     collector.emit(new Values(word))
    }
   }
  }
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
   declarer.declare(new Fields("word"))
  }
 }


The next step is the word count bolt:

 package com.gkatzioura.scala.storm
 import org.apache.storm.topology. {
  BasicOutputCollector,
  OutputFieldsDeclarer
 }
 import org.apache.storm.topology.base.BaseBasicBolt
 import org.apache.storm.tuple. {
   Fields,
   Tuple,
   Values
  }
  /** * Created by gkatzioura on 2/18/17. */
 class WordCountBolt extends BaseBasicBolt {
  val counts = scala.collection.mutable.Map[String, Int]()
  override def execute(input: Tuple, collector: BasicOutputCollector):
   Unit = {
    val word = input.getString(0)
    val optCount = counts.get(word)
    if (optCount.isEmpty) {
     counts.put(word, 1)
    } else {
     counts.put(word, optCount.get + 1)
    }
    collector.emit(new Values(word, counts))
   }
  override def declareOutputFields(declarer: OutputFieldsDeclarer):
   Unit = {
    declarer.declare(new Fields("word", "count"));
   }
 }


The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:

package com.gkatzioura.scala.storm
import org.apache.storm. {
 Config,
 LocalCluster,
 StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 /** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
 def main(args: Array[String]): Unit = {
  println("Hello, world!")
  val builder = new TopologyBuilder
  builder.setSpout("spout", new RandomSentenceSpout, 5)
  builder.setBolt("split",
   new SplitSentenceBolt, 8).shuffleGrouping("spout")
  builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
   new Fields("word"))
  val conf = new Config()
  conf.setDebug(true)
  if (args != null && args.length > 0) {
   conf.setNumWorkers(3)
   StormSubmitter.submitTopology(args(0),
    conf, builder.createTopology())
  } else {
   conf.setMaxTaskParallelism(3)
   val cluster = new LocalCluster
   cluster.submitTopology("word-count", conf, builder.createTopology())
   Thread.sleep(10000) cluster.shutdown()
  }
 }
}

Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

Our SBT file is as follows:

 name := "ScalaStorm" version := "1.0" 
 scalaVersion := "2.12.1" 
 scalacOptions += "-Yresolve-term-conflict:package" 
 libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" 

Then, we issue a build:

 sbt clean compile assembly 


You can find the source code on GitHub.

On the next post, we shall deploy our Storm app to HDInsight.

Scala (programming language)

Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Event-Driven Fractals
  • Easily Update and Reload SSL for a Server and an HTTP Client
  • Writing a Chat With Akka
  • Snowflake Data Processing With Snowpark DataFrames

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: