Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Process Nasty Fixed Width Files Using Apache Spark

DZone 's Guide to

How to Process Nasty Fixed Width Files Using Apache Spark

Fixed width files can be a pain. Learn how to get around the issues they present using Scala and Spark.

· Big Data Zone ·
Free Resource

A fixed width file is a very common flat file format when working with SAP, Mainframe, and Web Logs. Converting the data into a dataframe using metadata is always a challenge for Spark Developers. This particular article talks about all kinds of typical scenarios that a developer might face while working with a fixed witdth file. This solution is generic to any fixed width file and very easy to implement. This also takes care of the Tail Safe Stack as the RDD gets into the foldLeft operator.

Let's check the source file first and then the metadata file:

 10597    Subhasish Guha                                    subhasish.iot@gmail.com                 27           TXNPUES                    Yes          5007.10
191675    Ritam Mukherjee                                   ritam@gmail.com                         29 SINGLE    OUNHQEX XUFQONY            No 01/14/20133172.53   43537.00  
          Saurav Agarwal                                    agarwalsaurav@gmail.com                 30 MARRIED                                           100000.00 7000.00
 80495    Priyanshu Kumar                                   kumarpriyanshu@gmail.com                45 MARRIED   XDZANTV                    No 11/21/2012          

The source file has multiple issues:

  1. The end field does not have all the spaces.

  2. The last two fields are not present.

  3. In between fields, a few things are not present.

The metadata file looks like below:

col_name,size
account_id,10
name,50
mail,40
age,3
status,10
desc,27
cflow,3
process_date,10
debit_amt,10
credit_amt,10

Now please look at the generic code which could load the data in a dataframe:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{lit,col,concat_ws,split,struct}
import org.apache.spark.sql.catalyst.expressions.Explode
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.{lower, upper}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{input_file_name, regexp_extract}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.reflect.ClassTag

object FixedWidth {


def lsplit(pos: List[Int], str: String): Row = {

val (rest, result) = pos.foldLeft((str, List[String]())) {
case ((s, res),curr) =>
if(s.length()<=curr)
{
val split=s.substring(0).trim()
val rest=""
(rest, split :: res)
}
else if(s.length()>curr)
{
val split=s.substring(0, curr).trim()
val rest=s.substring(curr)
(rest, split :: res)
}
else
{
val split=""
val rest=""
(rest, split :: res)
}
}
Row.fromSeq(result.reverse)
}



def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C://winutils//")
val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "file:///C://Personalized//").getOrCreate()
val rdd= spark.sparkContext.textFile("path of your file")
val metadata=spark.read.option("header", "true").csv("path of your Metadata File")
val header=metadata.select("col_name").rdd.map(x=>x.getString(0).trim()).collect()
val sizeOfColumn=metadata.select("size").rdd.map(x=>x.getString(0).trim()).collect().map(_.toInt).toList
val fields = header.map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields)
val df=spark.createDataFrame(rdd.map { x => lsplit(sizeOfColumn,x) }, schema)
df.show(false)
}
}

The output of this code looks like what I've got below. I hope this helps all the developers who are handling this kind of file and facing some problems. This particular code will handle almost all possible discripencies which we face.

+----------+---------------+------------------------+---+-------+---------------+-----+------------+---------+----------+
|account_id|name           |mail                    |age|status |desc           |cflow|process_date|debit_amt|credit_amt|
+----------+---------------+------------------------+---+-------+---------------+-----+------------+---------+----------+
|10597     |Subhasish Guha |subhasish.iot@gmail.com |27 |       |TXNPUES        |Yes  |            |5007.10  |          |
|191675    |Ritam Mukherjee|ritam@gmail.com         |29 |SINGLE |OUNHQEX XUFQONY|No   |01/14/2013  |3172.53  |43537.00  |
|          |Saurav Agarwal |agarwalsaurav@gmail.com |30 |MARRIED|               |     |            |100000.00|7000.00   |
|80495     |Priyanshu Kumar|kumarpriyanshu@gmail.com|45 |MARRIED|XDZANTV        |No   |11/21/2012  |         |          |
+----------+---------------+------------------------+---+-------+---------------+-----+------------+---------+----------+
Topics:
apache spark ,scala tutorial ,big data ,apache spark tutorial scala ,apache spark tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}