DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Fighting NotSerializableException in Apache Spark

Fighting NotSerializableException in Apache Spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems.

Rado Buranský user avatar by
Rado Buranský
·
Mar. 11, 16 · Analysis
Like (7)
Save
Tweet
Share
5.96K Views

Join the DZone community and get the full member experience.

Join For Free

There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 // Create direct stream to Kafka
 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

 // Save raw data to Cassandra
 kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

 // Get some data from another Cassandra table
 val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

 // Filter and save data to Cassandra
 kafkaStream
 .filter { message =>
 // Whatever logic can be here, the point is that "someTable" is used
 someTable.filter(_.message == message).count > 42
 }
 .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

 def receive = Actor.emptyBehavior
}

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
 def init(): Unit = {
 // Create direct stream to Kafka ... the same code as before, only inside this methos
 val kafkaStream = ...
 ...
 }

 init()

 def receive = Actor.emptyBehavior
}

How simple. You’re welcome.

Apache Spark

Published at DZone with permission of Rado Buranský, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • How to Submit a Post to DZone
  • The Quest for REST
  • How To Create and Edit Excel XLSX Documents in Java

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: