Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Implementing Hadoop's Input and Output Format in Spark

DZone's Guide to

Implementing Hadoop's Input and Output Format in Spark

A detailed tutorial on how to use Apache Spark to implement Hadoop input and output formats.

· Big Data Zone
Free Resource

Access NoSQL and Big Data through SQL using standard drivers (ODBC, JDBC, ADO.NET). Free Download 

In this post, we will be discussing how to implement Hadoop input and output formats in Spark.

In order to understand the concepts explained here, it is best to have some basic knowledge of Apache Spark. We recommend you to go through the following posts before going through this post: Beginners Guide to Spark and Spark RDD's in Scala.

Now, let’s discuss about the input formats of Hadoop. An input split is nothing but the chunk of data that is present in HDFS. Each mapper will work on each input split. Before going through the map method, RecordReader will work on the input splits and arrange the records in key-value format.

The InputFormat describes the input-specification for a Map-Reduce job.

The Map-Reduce framework relies on the InputFormat of the job to do the following:

  • Validate the input-specification of the job.
  • Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
  • Provide the RecordReader implementation to be used to clean input records from the logical InputSplit for processing by the Mapper.

The default behavior of file-based InputFormats, typically sub-classes of FileInputFormat, is to split the input into logicalInputSplits based on the total size, in bytes, of the input files. However, the FileSystem block size of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.

By default, Hadoop takes TextInputFormat, where columns in each record are separated by tab space.

This is also called as KeyValueInputFormat. The keys and values used in Hadoop are serialized.

HadoopInputFormat

In Spark, we can implement the InputFormat of Hadoop to process the data, similar to Hadoop. For this, Spark provides API's of Hadoop in Java, Scala, Python.

Now, let’s look at a demo using Hadoop input formats in spark.

Spark has given support for both the old and new APIs of Hadoop. They are as follows:

Old APIs (Which support MapReduce libraries of Hadoop)

  • hadoopRDD
  • hadoopFile

New APIs (Which support MapReduce libraries of Hadoop)

  • newAPIHadoopRDD
  • newAPIHadoopFile

We can implement the APIs using Spark context as shown below.

Old APIs:

 SparkContext.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)

Here’s the explanation of the above line:

  • conf – Here the conf to be passed is org.apache.hadoop.mapred.JobConf. In this specific format, we need to pass the input file from the configuration.
  • InputFormatClass – Here you need to pass the Input format class of Hadoop.
  • KeyClass – Here you need to pass the input Key class.
  • ValueClass – Here you need to pass the input Value class.
SparkContext.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)

The above line can be explained like so:

  • Path - Here Input file is passed as the arguments itself in path.
  • InputFormatClass – Here you need to pass the Input format class of Hadoop.
  • KeyClass – Here you need to pass the input Key class.
  • ValueClass – Here you need to pass the input Value class.
  • minPartitions- Here you need to Specify the minimum number of partitions.
  • New APIs
 SparkContext.newAPIHadoopRDD(conf, fClass, kClass, vClass)

Here, the conf to be passed is org.apache.hadoop.conf.Configuration.

  • fClass is the Input format class
  • kClass is the input key class
  • vClass is the input value class
 SparkContext.newAPIHadoopFile(conf, fClass, kClass, vClass)

Here, the conf to be passed is org.apache.hadoop.conf.Configuration

  • fClass is the Input format class
  • kClass is the input key class
  • vClass is the input value class

Now, let’s look at a demo using one input file. The input data we are using here is:

  • Manjunath50,000

  • Kiran40,0000

  • Onkar45,0000

  • Prateek45,0000

Now, we will implement KeyValueTextInputFormat on this data.

val input = sc.newAPIHadoopFile("/home/kiran/Hadoop_input", classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text])


To use or implement any Java class in a Scala project, we need to use the syntax classOf[class_name].

Here KeyValueTextInputFormat and Text are Hadoop's IO classes written in Java. In this place, we can use our custom input format classes also, which will be discussed in our next post.

Input data

HadoopOutputFormat

Let’s look at HadoopOutputFormat and how to implement it in Spark. By default, Hadoop takes TextOutputFormat, where the key and value of the output are saved in the part file separated by commas.

The same can be implemented in Spark. Spark provides APIs for both old and new APIs of Hadoop. Spark provides API's for both mapred and MapReduce output formats.

Mapred API is saveAsHadoopFile

 SparkContext.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf, codec)

The explanation of the above line is as follows:

  • Path - Here in path, we need to give the path of the output file where it need to be saved.
  • Keyclass - Here you need to give the output key class.
  • Valueclass - Here you need to give the output value class.
  • outputFormatClass – Here you need to give the outputFormat class.
  • conf - Here you need to give the Hadoop configuration – org.apache.hadoop.mapred.jobConf.
  • codec – Here you need to give the compression format.
  • conf and codec are optional parameters.
 SparkContext.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)

The explanation of the above line is as follows:

  • Path - Here in path, we need to give the path of the output file where it need to be saved.
  • Keyclass - Here you need to give the output key class.
  • Valueclass - Here you need to give the output value class.
  • outputFormatClass - Here you need to give the outputFormat class.
  • conf – Here you need to give the Hadoop configuration – org.apache.hadoop.conf.configuration.

Here, in the place of keyclass, valueclass, outputFormatClass, we can define and give our own customOutputFormat classes as well.

Now, let’s save the output of the above HadoopInputFormat using HadoopOutputFormat. You can refer to the below screenshot for the same.

Hadoop's old API output format

In the above screenshot, you can see that we have saved the output using old API of Hadoop i.e., using mapred libraries.

Now, we will save the same using new APIs of Hadoop i.e., using mapreducelibraries. You can refer to the below screenshot for the same.

Hadoop's new API output format

This is how the output is saved in Spark, using the hadoopOutputFormat!

We hope this post has been helpful in understanding how to work on Hadoop Input format and Output format in Spark. In case of any queries, feel free to drop us a comment and we will get back to you at the earliest.

The fastest databases need the fastest drivers - learn how you can leverage CData Drivers for high performance NoSQL & Big Data Access.

Topics:
hadoop ,spark core ,input and output ,format

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}