Over a million developers have joined DZone.

Fighting NotSerializableException in Apache Spark

DZone's Guide to

Fighting NotSerializableException in Apache Spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems.

· Big Data Zone
Free Resource

NoSQL & Big Data Integration through standard drivers (ODBC, JDBC, ADO.NET). Free Download

There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 // Create direct stream to Kafka
 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

 // Save raw data to Cassandra
 kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

 // Get some data from another Cassandra table
 val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

 // Filter and save data to Cassandra
 .filter { message =>
 // Whatever logic can be here, the point is that "someTable" is used
 someTable.filter(_.message == message).count > 42
 .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

 def receive = Actor.emptyBehavior

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 def init(): Unit = {
 // Create direct stream to Kafka ... the same code as before, only inside this methos
 val kafkaStream = ...


 def receive = Actor.emptyBehavior

How simple. You’re welcome.

Easily connect any BI, ETL, or Reporting tool to any NoSQL or Big Data database with CData Drivers (ODBC, JDBC, ADO.NET). Download Now


Published at DZone with permission of Rado Buranský, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}