Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Fighting NotSerializableException in Apache Spark

DZone's Guide to

Fighting NotSerializableException in Apache Spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 // Create direct stream to Kafka
 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

 // Save raw data to Cassandra
 kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

 // Get some data from another Cassandra table
 val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

 // Filter and save data to Cassandra
 kafkaStream
 .filter { message =>
 // Whatever logic can be here, the point is that "someTable" is used
 someTable.filter(_.message == message).count > 42
 }
 .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

 def receive = Actor.emptyBehavior
}

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 def init(): Unit = {
 // Create direct stream to Kafka ... the same code as before, only inside this methos
 val kafkaStream = ...
 ...
 }

 init()

 def receive = Actor.emptyBehavior
}

How simple. You’re welcome.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
spark

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}