Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spark Corner Cases

DZone's Guide to

Spark Corner Cases

In this post we are going to consider methods/situtaions you might not encounter for your everyday Spark job, but will come in handy when the need arises.

· Big Data Zone
Free Resource

Learn how you can maximize big data in the cloud with Apache Hadoop. Download this eBook now. Brought to you in partnership with Hortonworks.

In the last two posts, we covered alternatives to using the groupByKey method, aggregateByKey and combineByKey. In this post we are going to consider methods/situtaions you might not encounter for your everyday Spark job, but will come in handy when the need arises.

Stripping the First Line (or First N Lines) of a File

While doing some testing, I wanted to strip off the first line of a file. Granted I could have easily created a copy with the header removed, but curiosity got the better of me. Just how would I remove the first line of a file? The answer is to use the mapPartitionsWithIndex method.

val inputData = sc.parallelize(Array("Skip this line XXXXX","Start here instead AAAA","Second line to work with BBB"))

val valuesRDD = inputData.mapPartitionsWithIndex((idx, iter) => skipLines(idx, iter, numLinesToSkip))

def skipLines(index: Int, lines: Iterator[String], num: Int): Iterator[String] = {
    if (index == 0) {
      lines.drop(num)
    }
    lines
  }

Here are the results from printing the resulting RDD:

Start here instead AAAA
Second line to work with BBB

The mapPartitionsWithIndex method passes the index (zero based) of the partition and an iterator containing the data to a user provided function (here skipLines). Using this example we could remove any number lines or work with the entire partition at one time (but that’s the subject of another post!)

Avoiding Lists of Iterators

Often when reading in a file, we want to work with the individual values contained in each line separated by some delimiter. Splitting a delimited line is a trivial operation:

newRDD = textRDD.map(line => line.split(","))

But the issue here is the returned RDD will be an iterator composed of iterators. What we want is the individual values obtained after calling the split function. In other words, we need anArray[String] not an Array[Array[String]]. For this we would use the flatMap function. For those with a functional programming background, using a flatMap operation is nothing new. But if you are new to functional programming it’s a great operation to become familiar with.

val inputData = sc.parallelize (Array ("foo,bar,baz", "larry,moe,curly", "one,two,three") ).cache ()

val mapped = inputData.map (line => line.split (",") )
val flatMapped = inputData.flatMap (line => line.split (",") )

val mappedResults = mapped.collect ()
val flatMappedResults = flatMapped.collect ();

println ("Mapped results of split")
println (mappedResults.mkString (" : ") )

println ("FlatMapped results of split")
println (flatMappedResults.mkString (" : ") )

When we run the program we see these results:

Mapped results of split
[Ljava.lang.String;@45e22def : [Ljava.lang.String;@6ae3fb94 : [Ljava.lang.String;@4417af13
FlatMapped results of split
foo : bar : baz : larry : moe : curly : one : two : three

As we can see the map example returned an Array containing 3 Array[String] instances, while the flatMap call returned individual values contained in one Array.

Maintaining an RDD’s Original Partitoning

When working in Spark, you quickly come up to speed with the map function. It’s used to take a collection of values and ‘map’ them into another type. Typically when working with key-value pairs, we don’t need the key to remain the same type, if we need to keep the key at all. However, there are cases where we’ll need to keep the original RDD partitioning. This presents a problem; when calling the map function there is no guarantee that the key has not been modified. What’s our option? We use the mapValues function (on the PairRDDFunctions class) which allows us to change the values and retain the original key and partitioning of the RDD.

//Details left out for clarity
val data = sc.parallelize(Array( (1,"foo"),(2,"bar"),(3,"baz"))).cache()
val mappedRdd = data.map(x => (x._1,x._2.toUpperCase))
val mappedValuesRdd = data.mapValues(s => s.toUpperCase)

println("Mapping Results")
printResults(mappedRdd.collect())
printResults(mappedValuesRdd.collect())

And we’ll see the results from our simple example are exactly the same:

Mapping Results
(1,FOO) -> (2,BAR) -> (3,BAZ)
(1,FOO) -> (2,BAR) -> (3,BAZ)

Conclusion

This has been a short tour of some methods we can use when we encounter an unfamiliar situation writing Spark jobs. Thanks for your time.

Resources

Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

Topics:
big data ,spark

Published at DZone with permission of Bill Bejeck, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}