{{announcement.body}}
{{announcement.title}}

Drilling Into Big Data — Data Preparation

DZone 's Guide to

Drilling Into Big Data — Data Preparation

At this point, you may be wondering why we need a multi-node cluster. Why don't we just create a single-node cluster instead?

· Big Data Zone ·
Free Resource

We have already introduced you to Hadoop and clusters in our previous article. At this point, you may be wondering why we need a multi-node cluster. Since most of the services will be running on the master, why don't we just create a single-node cluster instead?

There are two main reasons for using a multi-node cluster. First, the amount of data to be stored and processed can be too large for a single-node to handle. Second, the computational power of a single-node cluster can be limited.

You may also like: Data Exploration and Data Preparation for Business Insights

Data Preparation

As discussed in the previous articles, we have data that we collect for a reason, but left it as is without any analysis. For businesses, there is no meaning of obtaining data and keeping it as it is. Data preparation means preparing or transforming the raw data into refined information, which can be used effectively for various business purposes and analyses.

Our ultimate goal is to turn data into information and information into insight, which can help you in various aspects of decision making and business improvements. Data processing or preparation is not a new term to look at, as it has been there from the beginning days when processing has been done manually. Now that data has become big, and it is time to perform processing by automatic means to save time and arrive at better accuracy.

If you browse for the top five Big Data processing frameworks, you will find this list of words popped up:

  • Hadoop.
  • Spark.
  • Storm.
  • Flink.
  • Samza.

The first two of the five frameworks are well-known and most implemented among various projects. They are also mainly batch processing frameworks. It seems like they are similar, but there is much difference between these two. Let's have a quick look at comparative analysis.

Criteria Spark Hadoop MapReduce
Processing In-memory Persists on the disk after map and reduce functions
Ease of use Easy due to support of Scala and python Tough as only Java is supported
Speed Runs applications 100 times faster Slower
Latency Lower Higher
Task Scheduling Schedules tasks by itself Requires external schedulers

According to the table, various factors made us jump from MapReduce to Spark. Another simple reason is its ease of use, as it comes with user-friendly APIs for Scala, Java, Python, and Spark SQL. Spark SQL is similar to SQL 92, hence it's easy even for the beginners. Some of the key features that make Spark a strong big data processing engine are:

  • Equipped with MLlib.
  • Supports multiple languages like Scala, Python, and Java.
  • A single library is capable of performing SQL, graph analytics and streaming.
  • Stores data in RAM of the servers which makes access easier and analytics faster.
  • Real-time processing.
  • Compatible with Hadoop(works independently and on top of Hadoop).

Spark Over Storm

We compared the first two and arrived at a solution. Sometimes, people may prefer the third stack too, which is Storm. Both are common stack for real-time processing and analytics. Storm is a pure streaming framework but many features like MLlib are not available in Storm as it is a smaller framework.

Spark is preferred over Storm for details like scaling up and scaling down of services. It's better to know the differences to switch between various tools based on the requirement. In this article, we will focus on Spark, a widely used processing tool.

Components of Spark

Apache Spark Ecosystem.
Spark SQL

  • Spark Core — It is the base that consists of a general execution engine used for dispatching and scheduling.
  • Spark SQL — It is a component on top of Spark Core which introduces a new set of data abstractions called Schema RDD. This supports both structured and semi-structured data.
  • Spark Streaming — This component enables fault-tolerant real-time processing of live data streams which provides an API to manipulate data streams.
  • MLlib (Machine Learning Library) — Apache Spark is equipped with MLlib which contains a wide array of machine learning algorithms, collaboration filters, etc.

Applications

Some applications of Apache Spark are

  • Machine Learning – As known, Apache Spark is equipped with Machine Learning Library called MLlib that can perform advanced analytics such as clustering, classification, etc.
  • Event detection – Spark streaming allows the organization to keep a track of rare and unusual behaviors for protecting the system.

Spark Compatibility

  • File Formats — Spark supports all the file formats supported by Hadoop from unstructured like the text to structured, like Sequence Files. But as discussed earlier, using appropriate file formats can result in better performance.
  • File systems — Local, Amazon s3, HDFS, etc.
  • Databases — Supports many like Cassandra, Hbase, Elastic search with the help of Hadoop connectors and custom spark connectors.

Usage

Spark is a powerful tool that provides an interactive shell to analyze data interactively. The points below will highlight opening, using and closing a spark-shell.

Opening Spark Shell

Generally, Spark is built using Scala. Type the following command to initiate the spark-shell.

$ spark-shell

hadoop spark-shell

If the Spark shell opens successfully then you will find the following screen. The last line of the output "Spark context available as sc" means spark has automatically created spark context objects with the name sc. If this is not there, then before starting, create a SparkContext object.

hadoop SparkContext

Now you are all set to carry on with Scala programs.

Press "Ctrl+z" to come out of spark-shell if needed.

Spark Context

The SparkContext can connect to several types of cluster managers, which can allocate resources across applications. Let me show two different scenarios with two different languages.

Let's find out Museum count by state from the data ingested, using Scala and write back the output as CSV file into Hadoop.

Scala




xxxxxxxxxx
1


 
1
import java.io._ import scala.Array._ import scala.io._ import java.io.BufferedOutputStream import java.io.FileOutputStream import java.io.InputStream import java.io.OutputStream import java.util.Calendar import java.lang._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configured import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.io.IOUtils import org.apache.hadoop.util.Tool import org.apache.hadoop.util.ToolRunner import org.apache.spark.sql.SparkSession //import com.databricks.spark.avro._ import java.util.Calendar object museum { def main(args: Array [String]) { println (Calendar.getInstance ().getTime()) var cols="" val spark1 = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark Avro Reader").getOrCreate var df1 = spark1.read.format("com.databricks.spark.csv").option("header","True").option("escape","\"").load("hdfs://emr-header-1.cluster-95904:9000/user/demo/tripadvisor_merged.csv").coalesce(1) df1.createOrReplaceTempView("museum") val df2 = spark1.sql("select State, count(MuseumName) Museum_Count from museum group by State") var flag=0 df2.foreachPartition(itr =>{ val conf = new Configuration() conf.set("fs.defaultFS", "hdfs://emr-header-1.cluster-95904:9000") val fs= FileSystem.get(conf) val output = fs.create(new Path("/user/ogs/etl/processed/MUSEUM_COUNT_BY_STATE/MUSEUM_COUNT_BY_STATE.csv")) val pw1 = new PrintWriter(output) if(flag==0){ cols="State"+","+"Count"+"\n" ; pw1.write(cols) ; flag=1 } while(itr.hasNext) { val item = itr.next().toString() val l=item.length cols =item.toString().substring(1, l-1) cols=cols.concat("\n") pw1.write(cols) //println(cols) } pw1.close })



Here spark reads this file remembering it as a comma-separated file. But a column named Address in this sheet has commas by itself. So to avoid splitting them into different columns, we use "escape" here. Scala is dependent on Java and hence there is a need to import various libraries. Let's make it short using "pyspark".

Before initiating Spark with Python, install the needed libraries. Here I am installing Pandas that are used for efficient file handling.

pip install pandas

Now initiate the shell using "pyspark" command.

pyspark command

Let's find out the top 10 museums by visitor count. The following code makes use of Spark SQL and the conventions of the Pyspark shell. You can also make use of Panda's data frame to read and process a file. But using Spark reading and writing formats ends up in better efficiency.

SQL




xxxxxxxxxx
1


 
1
import pandas as pd from pyspark.sql import SparkSession df = spark.read.format("com.databricks.spark.csv").option("header","True").option("escape", "\"").load("hdfs://emr-header-1.cluster-95904:9000/user/demo/sqoop/tripadvisor_merged.csv") df.createOrReplaceTempView("family") from pyspark.sql.functions import lit df1 = spark.sql("select MuseumName,Families_Count Count from (select MuseumName,Families_Count,rank() over(order by length(Families_Count) desc, Families_Count desc) rank from family) where rank <=30").withColumn("Visitor_Type", lit("Families_Count")) df2 = spark.sql("select MuseumName ,Couples_Count Count from (select MuseumName,Couples_Count,rank() over(order by length(Couples_Count) desc,Couples_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Couples_Count")) df3 = spark.sql("select MuseumName,Solo_Count Count from (select MuseumName,Solo_Count,rank() over(order by length(Solo_Count) desc,Solo_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Solo_Count")) df4 = spark.sql("select MuseumName,Business_Count Count from (select MuseumName,Business_Count,rank() over(order by length(Business_Count) desc,Business_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Business_Count")) df5 = spark.sql("select MuseumName,Friends_Count Count from (select MuseumName,Friends_Count,rank() over(order by length(Friends_Count) desc,Friends_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Friends_Count")) df6 = df1.unionAll(df2).unionAll(df3).unionAll(df4).unionAll(df5) df6.write.csv('/user/demo/spark/top_museums_by_count.csv')
2
 
          



We can also save this script with .py extension and submit the application using spark-submit. We had various counts to be found out. Hence we created separate DataFrames and merged them using a union. Sorting is done by the ordering of the first digit as opposed to the number if you are using a normal sort code. So the result will be something like below:

dataframes pyspark

In this case, including the length of the column also for exact results.

For example,

SQL




xxxxxxxxxx
1


 
1
df1 = spark.sql ("select MuseumName, Families_Count from family order by length(Families_Count) desc, Families_Count desc")
2
 
          



length(Families_Count)

Once done, write the Spark DataFrame as a CSV file. The default behavior is to save the output in multiple part-*.CSV files in the provided path. Let's query the folder where we wrote back. You can see "top_museums.csv" which is not a CSV file but a directory in which your output is saved in multiple parts. This structure of folder reference plays a major role in distributed storage and processing.

hadoop@emr-header-1 root

Suppose, I have to save a Dataframe with:

  • Path which maps to the exact file name instead of a folder.
  • Write as a single file instead of multiple files.

Then, coalesce the DF and then save the file.

Benefits of Spark on Alibaba Cloud

Adaptive Execution

Spark SQL of Alibaba Cloud supports adaptive execution. It is used to set the number of reduce tasks automatically and solve data skew by itself. By setting the range of the shuffle partition number, the adaptive execution framework of Spark SQL can dynamically adjust the number of reduce tasks at different stages of different jobs.

Data Skew

Data skew refers to the scenario where certain tasks involve too much data in the processing. Spark SQL does not perform optimization for skewed data, which can be solved by the Adaptive Execution framework of Spark SQL. This can automatically detect skewed data and perform run-time optimizations.

Best Practices

  • Do not collect large RDD and prefer the default dataset API over RDD, if possible.
  • Avoid UDF and replace them with Spark SQL functions.
  • On executing HDFS read/write job, set the number of concurrent jobs for each executor to be less than or equal to 5 for reading and writing.
  • A general tip is to look for the execution time and boost the job accordingly.

Hope you enjoyed learning Spark. Our next steps would be to explore creating and submitting various jobs using Alibaba Cloud UI, as well as to perform querying and analysis. In the next article, we will walk you through the basics of Hive, including table creation and other underlying concepts for big data applications.

"The goal is to turn data into information and information into insight," — Carly Fiorina


Further Reading

How to Prepare Data For OCR Learning

Data Exploration and Data Preparation for Business Insights

3 Takeaways From the 2019 Gartner Market Guide for Data Prep

Topics:
big data analytics ,alibaba cloud tutorial ,big data ,panda ,dataframe

Published at DZone with permission of Priyankaa Arunachalam . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}