Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Fighting NotSerializableException in Apache Spark

DZone's Guide to

Fighting NotSerializableException in Apache Spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems.

· Big Data Zone
Free Resource

Effortlessly power IoT, predictive analytics, and machine learning applications with an elastic, resilient data infrastructure. Learn how with Mesosphere DC/OS.

There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 // Create direct stream to Kafka
 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

 // Save raw data to Cassandra
 kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

 // Get some data from another Cassandra table
 val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

 // Filter and save data to Cassandra
 kafkaStream
 .filter { message =>
 // Whatever logic can be here, the point is that "someTable" is used
 someTable.filter(_.message == message).count > 42
 }
 .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

 def receive = Actor.emptyBehavior
}

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 def init(): Unit = {
 // Create direct stream to Kafka ... the same code as before, only inside this methos
 val kafkaStream = ...
 ...
 }

 init()

 def receive = Actor.emptyBehavior
}

How simple. You’re welcome.

Learn to design and build better data-rich applications with this free eBook from O’Reilly. Brought to you by Mesosphere DC/OS.

Topics:
spark

Published at DZone with permission of Rado Buranský, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}