DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Debezium Serialization With Apache Avro and Apicurio Service Registry
  • Ports and Adapters Architecture with Kafka, Avro, and Spring-Boot
  • Custom Partitioner in Kafka Using Scala: Take Quick Tour!
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL

Trending

  • ITBench, Part 1: Next-Gen Benchmarking for IT Automation Evaluation
  • Navigating and Modernizing Legacy Codebases: A Developer's Guide to AI-Assisted Code Understanding
  • Introducing Graph Concepts in Java With Eclipse JNoSQL, Part 2: Understanding Neo4j
  • Agile’s Quarter-Century Crisis
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Avro Scala Example

Kafka Avro Scala Example

Learn how to write and read messages in Avro format to/from Kafka. Read on to understand how to produce messages encoded with Avro, how to send them into Kafka, and how to consume with consumer and finally how to decode them.

By 
Sushil Kumar Singh user avatar
Sushil Kumar Singh
·
Jan. 06, 17 · Opinion
Likes (11)
Comment
Save
Tweet
Share
424.0K Views

Join the DZone community and get the full member experience.

Join For Free

Let's learn how to write and read messages in Avro format to/from Kafka. Read on to understand how to produce messages encoded with Avro, how to send them into Kafka, and how to consume with consumer and finally how to decode them. But, instead of using with plain-text messages, we will serialize our messages with Avro. That will allow us to send much more complex data structures over the wire.

Avro

Apache Avro is a language neutral data serialization format. Avro data is described in a language independent schema. The schema is usually written in JSON format and the serialization is usually to binary files although serialization to JSON is also supported.

Let’s add Avro dependency in our build:

"org.apache.avro"  %  "avro"  %  "1.7.7"

We will consider a schema like this:

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {  "name": "id", "type": "int"},
         {   "name": "name",  "type": "string"},
         {   "name": "email", "type": ["string", "null"]}
     ]
}

You can instantiate schema as follows:

val schema: Schema = new Schema.Parser().parse(SCHEMA_STRING)

Here, SCHEMA_STRING is the JSON listed above as a Java String.

Now, we can create an Avro generic record object with instantiated schema and put user data into it.

val genericRecord: GenericRecord = new GenericData.Record(schema)

genericUser.put("id", "1")
genericUser.put("name", "singh")
genericUser.put("email", null)

After creating the generic record. Now we need to serialize the above generic record object. Here we will use Avro binary encoder to encode object into byte array.

val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(genericUser, encoder)
encoder.flush()
out.close()

val serializedBytes: Array[Byte] = out.toByteArray()

You can also use many third party API to serialize and deserialize and may be most friendly API.

So, it’s time to send serialized message to Kafka using producer. Here is entire Kafka Producer code:

Producer

import java.util.{Properties, UUID}

import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import domain.User
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream

import org.apache.avro.io._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.io.Source

class KafkaProducer() {

  private val props = new Properties()

  props.put("metadata.broker.list", "localhost:9092")
  props.put("message.send.max.retries", "5")
  props.put("request.required.acks", "-1")
  props.put("serializer.class", "kafka.serializer.DefaultEncoder")
  props.put("client.id", UUID.randomUUID().toString())

  private val producer = new Producer[String, Array[Byte]](new ProducerConfig(props))

  //Read avro schema file
  val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/schema.avsc")).mkString)

  // Create avro generic record object
   val genericUser: GenericRecord = new GenericData.Record(schema)

 //Put data in that generic record
 genericUser.put("id", "1")
 genericUser.put("name", "sushil")
 genericUser.put("email", null)

 // Serialize generic record into byte array
 val writer = new SpecificDatumWriter[GenericRecord](schema)
 val out = new ByteArrayOutputStream()
 val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
 writer.write(genericUser, encoder)
 encoder.flush()
 out.close()

 val serializedBytes: Array[Byte] = out.toByteArray()

val queueMessage = new KeyedMessage[String, Array[Byte]](topic, serializedBytes) 
producer.send(queueMessage)

Now, in the same way we updated the producer to send binary message, we will create consumer which consume message from Kafka, deserialize and make generic record from it.

Consumer

import java.util.Properties

import domain.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder

import scala.io.Source

class KafkaConsumer() {
  private val props = new Properties()

  val groupId = "demo-topic-consumer"
  val topic = "demo-topic"

  props.put("group.id", groupId)
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "smallest")
  props.put("consumer.timeout.ms", "120000")
  props.put("auto.commit.interval.ms", "10000")

  private val consumerConfig = new ConsumerConfig(props)
  private val consumerConnector = Consumer.create(consumerConfig)
  private val filterSpec = new Whitelist(topic)
  private val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()

//read avro schema file
  val schemaString = Source.fromURL(getClass.getResource("/schema.avsc")).mkString
  // Initialize schema
  val schema: Schema = new Schema.Parser().parse(schemaString)

 def read() =
 try {
 if (hasNext) {
 println("Getting message from queue.............")
 val message: Array[Byte] = iterator.next().message()
 getUser(message)
 } else {
 None
 }
 } catch {
 case ex: Exception => ex.printStackTrace()
 None
 }

 private def hasNext: Boolean =
 try
 iterator.hasNext()
 catch {
 case timeOutEx: ConsumerTimeoutException =>
 false
 case ex: Exception => ex.printStackTrace()
 println("Got error when reading message ")
 false
 }

private def getUser(message: Array[Byte]) = {

 // Deserialize and create generic record
 val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
 val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
 val userData: GenericRecord = reader.read(null, decoder)

 // Make user object
 val user = User(userData.get("id").toString.toInt, userData.get("name").toString, try {
 Some(userData.get("email").toString)
 } catch {
 case _ => None
 })
 Some(user)
}

}

Conclusion

In this post, we have seen how to produce messages encoded with Avro, how to send them into Kafka, how to consume with consumer and finally how to decode them. This helps us make a messaging system with complex data with the help of Kafka and Avro.

The one thing you have to note that the same Avro schema must be present on the both side (Producer and Consumer) to encode and decode messages. Any change to schema, must be applied on both sides. To overcome this problem, Confluent Platform comes into play with its Schema Registry which allows us to share Avro schema and handle changes of schemas.

You can find complete code on GitHub.

avro kafka Scala (programming language)

Published at DZone with permission of Sushil Kumar Singh, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Debezium Serialization With Apache Avro and Apicurio Service Registry
  • Ports and Adapters Architecture with Kafka, Avro, and Spring-Boot
  • Custom Partitioner in Kafka Using Scala: Take Quick Tour!
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!