Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Fast Data Access Using GemFire and Apache Spark (Part 1): Introduction

DZone's Guide to

Fast Data Access Using GemFire and Apache Spark (Part 1): Introduction

An introductory tutorial on Pivotal's GemFire framework and how it can work in tandem with Apache Spark in you microservices.

· Microservices Zone ·
Free Resource

Record growth in microservices is disrupting the operational landscape. Read the Global Microservices Trends report to learn more.

This is the first article of the series, where we will be talking about the basic concepts of the Apache Spark and GemFire, and setting up and establishing a connection.

Future series:

  • Microservices with GemFire and loading data into a batch using Apache Spark and Hive.
  • Microservices with GemFire using CQRS, getting data into Hive and Apache Spark, and streaming sync data into Gemfire.

GemFire:

"Pivotal GemFire is a distributed data management platform. Pivotal GemFire is designed for many diverse data management situations but is especially useful for high-volume, latency-sensitive, mission-critical, transactional systems, high-performance, real-time apps." - VMWare.com

Apache Spark:

"Apache Spark is an open-source distributed general-purpose cluster computing framework with (mostly) in-memory data processing engine that can do ETL, analytics, machine learning and graph processing on large volumes of data at rest (batch processing) or in motion (streaming processing) with rich concise high-level APIs for the programming languages: Scala, Python, Java, R, and SQL." - Jacek Laskowski

Microservice:

Microservices are small, autonomous services that work together.

Big Data vs. Fast Data:

Big Data is data stored in a data lake and in rest, size in petabytes, and best fit to store into HDFS, HBase or cloud storage as required lots of space on disk. Fast data is to smaller data sets used in near-real or real-time in order to solve a problem. Microservices require fast data and a grander scheme of things.

GemFire Connection with Spark:

Prerequisite: Java 8 , basic knowledge of GemFire and Apache Spark,Window 7 or above .

Operating System: Windows (64-bit)

1.Download GemFire Apache spark connector.

2 Download Gradle

3. Download GemFire

4. Download Apache Spark

5. Download Winutils

Configure Spark:

  • Make sure JAVA_HOME is set into env variable.
  • Unzip Spark 2.4, make sure no space in between folder name.
  • Configure Window "SPARK_HOME" in env variable.

  • Add env path and give SPARK_HOME \bin
  • Create a Hadoop folder and inside bin copy "winutils.exe"
  • Add env HADOOP_HOME=C:\hadoop
  • Go to <spark-root-dir>/conf/spark-defaults.conf and add
spark.geode.locators=localhost[55221]
  • Go to <spark-root-dir>/conf/conf/log4j.properties
log4j.rootCategory=WARN, console
  • Open cmd and type spark-shell

Configure GemFire

Now we are going to configure GemFire

  • Unzip GemFire and go to root folder/bin "Gemfire 9.2.2\bin"
  • Double click on gfsh.bat and see gfsh terminal, spend some time to learn GemFire in 15 minutes.
  • Now run the following command on the gfsh terminal:
start locator --name=locator1 --port=55221

start server --name=server1 --locators=localhost[55221] --server-port=0

start server --name=server2 --locators=localhost[55221] --server-port=0

Create Region

GemFire has two different region types: replicated and partitioned regions. A replicated region has a full dataset on each server, while a partitioned region has its dataset spanning multiple servers, and may have duplicates for high availability.

create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String

create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String


# Make sure region created 

list regions

Unzip Gradle and set env variable GRADLE_HOME=<path>

  • Now unzip "geode-spark-connector-develop" and go to root dir and run following command "./gradlew clean build -x test" and inside build folder you can see following jar files .
  1. geode-functions-1.0.0.jar
  2. geode-spark-connector.jar
  • Deploy Spark Geode Connector's function geode-function jar
deploy --jar=<path to connector project>/geode-functions/build/libs/geode-functions-1.0.0.jar

list deployed

We are all set to connect Apache Spark and GemFire. Run the following command; I have configured it to work in the Windows command terminal (change jar path in command as needed):

spark-shell --master local[*] --jars C:\Users\vaquarkhan\Documents\code\geode-dependencies.jar,C:\Users\vaquarkhan\Documents\code\geode-spark-connector.jar,C:\Users\vaquarkhan\Documents\code\geode-core-9.2.2.jar,C:\Users\vaquarkhan\Documents\code\log4j-api-2.8.2.jar,C:\Users\vaquarkhan\Documents\code\log4j-core-2.8.2.jar,C:\Users\vaquarkhan\Documents\code\commons-validator-1.6.jar,C:\Users\vaquarkhan\Documents\code\fastutil-7.1.0.jar,C:\Users\vaquarkhan\Documents\code\shiro-core-1.3.2.jar

  • Check the geode.locators property in the Spark shell:
sc.getConf.get("spark.geode.locators")
  • Next, we need to import a connector to Spark:
import org.apache.geode.spark.connector._
  • Create Apache Spark RDD and save it to GemFire. Then, fetch it from Apache Spark:
//example 1

val data = Array(("1", "Vaquarkhan"), ("2", "Zidankhan"), ("3", "ZerinaKhan"))


val distData = sc.parallelize(data)


distData.saveToGeode("str_str_region") //str_str_region is region we created in gemfire


//example 2


val data2 = Array("VKhan","Vkhan1","Vkhan12")


val distData2 = sc.parallelize(data2)


distData2.saveToGeode("int_str_region", e => (e.length, e))

  • Verify it in gfsh:
query --query="select key,value from /str_str_region.entries"

query --query="select key,value from /str_str_region.entries"

  • Read data from GemFire to Apache Spark
val rdd = sc.geodeRegion[String, String]("str_str_region")

rdd.foreach(println)

val rdd2 = sc.geodeRegion[Int, String]("int_str_region")

rdd2.foreach(println)

Store Objects in GemFire

  • Create a Java jar ("emp.jar") and import it into GemFire.
package com.khan.vaquar;

import java.io.Serializable;

public class Emp implements Serializable {

 /**
  * @author Vaquar Khan
  */
 private static final long serialVersionUID = 1 L;

 private int id;

 private String lname;

 private String fname;

 private int age;

 private String loc;

 public Emp(int id, String lname, String fname, int age, String loc) {
  this.id = id;
  this.lname = lname;
  this.fname = fname;
  this.age = age;
  this.loc = loc;
 }

 public int getId() {
  return id;
 }

 public void setId(int id) {
  this.id = id;
 }

 public String getLname() {
  return lname;
 }

 public void setLname(String lname) {
  this.lname = lname;
 }

 public String getFname() {
  return fname;
 }

 public void setFname(String fname) {
  this.fname = fname;
 }

 public int getAge() {
  return age;
 }

 public void setAge(int age) {
  this.age = age;
 }

 public String getLoc() {
  return loc;
 }

 public void setLoc(String loc) {
  this.loc = loc;
 }

 public static long getSerialversionuid() {
  return serialVersionUID;
 }

 @Override
 public int hashCode() {
  final int prime = 31;
  int result = 1;
  result = prime * result + age;
  result = prime * result + ((fname == null) ? 0 : fname.hashCode());
  result = prime * result + id;
  result = prime * result + ((lname == null) ? 0 : lname.hashCode());
  result = prime * result + ((loc == null) ? 0 : loc.hashCode());
  return result;
 }

 @Override
 public boolean equals(Object obj) {
  if (this == obj)
   return true;
  if (obj == null)
   return false;
  if (getClass() != obj.getClass())
   return false;
  Emp other = (Emp) obj;
  if (age != other.age)
   return false;
  if (fname == null) {
   if (other.fname != null)
    return false;
  } else if (!fname.equals(other.fname))
   return false;
  if (id != other.id)
   return false;
  if (lname == null) {
   if (other.lname != null)
    return false;
  } else if (!lname.equals(other.lname))
   return false;
  if (loc == null) {
   if (other.loc != null)
    return false;
  } else if (!loc.equals(other.loc))
   return false;
  return true;
 }

 @Override
 public String toString() {
  return "Emp [id=" + id + ", lname=" + lname + ", fname=" + fname + ", age=" + age + ", loc=" + loc + "]";
 }


}
  • Create Region and deploy Jar
create region --name=emps --type=PARTITION 

deploy --jar=<path to connector project>/emp.jar
  • Spark console
import org.apache.geode.spark.connector._
import scala.util.Random

import com.khan.vaquar.Emp

val lnames = List("Smith", "Johnson", "Jones", "Miller", "Wilson", "Taylor", "Thomas", "Lee", "Green", "Parker", "Powell")

val fnames = List("John", "James", "Robert", "Paul", "George", "Kevin", "Jason", "Jerry", "Peter", "Joe", "Alice", "Sophia", "Emma", "Emily")

val locs = List("CA", "WA", "OR", "NY", "FL")

def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size))

val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray

val rdd1 = sc.parallelize(d1) 

rdd1.saveToGeode("emps", e => (e.getId, e))


query --query="select * from /emps" 

SparkUI

Check Spark UI for details

  • Shutdown GemFire.
stop server --name=server1

stop server --name=server2

stop locator --name=locator1 

If you need further information, read the connector doc in GitHub.

Conclusion

In this article, we discovered some of the framework's main concepts. GemFire-Spark connector is an extraordinarily well-done architecture: it accomplishes specific tasks using modern approaches.

References:

  • https://spark.apache.org/
  • https://pivotal.io/pivotal-gemfire
  • https://github.com/Pivotal-Field-Engineering


Learn why microservices are breaking traditional APM tools that were built for monoliths.

Topics:
microservice ,apache spark ,gemfire

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}