Fast Data Access Using GemFire and Apache Spark (Part 1): Introduction
An introductory tutorial on Pivotal's GemFire framework and how it can work in tandem with Apache Spark in you microservices.
Join the DZone community and get the full member experience.
Join For FreeThis is the first article of the series, where we will be talking about the basic concepts of the Apache Spark and GemFire, and setting up and establishing a connection.
Future series:
- Microservices with GemFire and loading data into a batch using Apache Spark and Hive.
- Microservices with GemFire using CQRS, getting data into Hive and Apache Spark, and streaming sync data into Gemfire.
"Pivotal GemFire is a distributed data management platform. Pivotal GemFire is designed for many diverse data management situations but is especially useful for high-volume, latency-sensitive, mission-critical, transactional systems, high-performance, real-time apps." - VMWare.com
"Apache Spark is an open-source distributed general-purpose cluster computing framework with (mostly) in-memory data processing engine that can do ETL, analytics, machine learning and graph processing on large volumes of data at rest (batch processing) or in motion (streaming processing) with rich concise high-level APIs for the programming languages: Scala, Python, Java, R, and SQL." - Jacek Laskowski
Microservices are small, autonomous services that work together.
Big Data vs. Fast Data:
Big Data is data stored in a data lake and in rest, size in petabytes, and best fit to store into HDFS, HBase or cloud storage as required lots of space on disk. Fast data is to smaller data sets used in near-real or real-time in order to solve a problem. Microservices require fast data and a grander scheme of things.
GemFire Connection with Spark:
Prerequisite: Java 8 , basic knowledge of GemFire and Apache Spark,Window 7 or above .
Operating System: Windows (64-bit)
1.Download GemFire Apache spark connector.
2 Download Gradle
3. Download GemFire
4. Download Apache Spark
5. Download Winutils
Configure Spark:
- Make sure JAVA_HOME is set into env variable.
- Unzip Spark 2.4, make sure no space in between folder name.
- Configure Window "SPARK_HOME" in env variable.
- Add env path and give SPARK_HOME \bin
- Create a Hadoop folder and inside bin copy "winutils.exe"
- Add env HADOOP_HOME=C:\hadoop
- Go to <spark-root-dir>/conf/spark-defaults.conf and add
spark.geode.locators=localhost[55221]
- Go to <spark-root-dir>/conf/conf/log4j.properties
log4j.rootCategory=WARN, console
- Open cmd and type spark-shell
Configure GemFire
Now we are going to configure GemFire
- Unzip GemFire and go to root folder/bin "Gemfire 9.2.2\bin"
- Double click on gfsh.bat and see gfsh terminal, spend some time to learn GemFire in 15 minutes.
- Now run the following command on the gfsh terminal:
start locator --name=locator1 --port=55221
start server --name=server1 --locators=localhost[55221] --server-port=0
start server --name=server2 --locators=localhost[55221] --server-port=0
Create Region
GemFire has two different region types: replicated and partitioned regions. A replicated region has a full dataset on each server, while a partitioned region has its dataset spanning multiple servers, and may have duplicates for high availability.
create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String
create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String
# Make sure region created
list regions
Unzip Gradle and set env variable GRADLE_HOME=<path>
- Now unzip "geode-spark-connector-develop" and go to root dir and run following command "./gradlew clean build -x test" and inside build folder you can see following jar files .
- geode-functions-1.0.0.jar
- geode-spark-connector.jar
- Deploy Spark Geode Connector's function geode-function jar
deploy --jar=<path to connector project>/geode-functions/build/libs/geode-functions-1.0.0.jar
list deployed
We are all set to connect Apache Spark and GemFire. Run the following command; I have configured it to work in the Windows command terminal (change jar path in command as needed):
spark-shell --master local[*] --jars C:\Users\vaquarkhan\Documents\code\geode-dependencies.jar,C:\Users\vaquarkhan\Documents\code\geode-spark-connector.jar,C:\Users\vaquarkhan\Documents\code\geode-core-9.2.2.jar,C:\Users\vaquarkhan\Documents\code\log4j-api-2.8.2.jar,C:\Users\vaquarkhan\Documents\code\log4j-core-2.8.2.jar,C:\Users\vaquarkhan\Documents\code\commons-validator-1.6.jar,C:\Users\vaquarkhan\Documents\code\fastutil-7.1.0.jar,C:\Users\vaquarkhan\Documents\code\shiro-core-1.3.2.jar
- Check the
geode.locators
property in the Spark shell:
sc.getConf.get("spark.geode.locators")
- Next, we need to import a connector to Spark:
import org.apache.geode.spark.connector._
- Create Apache Spark RDD and save it to GemFire. Then, fetch it from Apache Spark:
//example 1
val data = Array(("1", "Vaquarkhan"), ("2", "Zidankhan"), ("3", "ZerinaKhan"))
val distData = sc.parallelize(data)
distData.saveToGeode("str_str_region") //str_str_region is region we created in gemfire
//example 2
val data2 = Array("VKhan","Vkhan1","Vkhan12")
val distData2 = sc.parallelize(data2)
distData2.saveToGeode("int_str_region", e => (e.length, e))
- Verify it in gfsh:
query --query="select key,value from /str_str_region.entries"
query --query="select key,value from /str_str_region.entries"
- Read data from GemFire to Apache Spark
val rdd = sc.geodeRegion[String, String]("str_str_region")
rdd.foreach(println)
val rdd2 = sc.geodeRegion[Int, String]("int_str_region")
rdd2.foreach(println)
Store Objects in GemFire
- Create a Java jar ("emp.jar") and import it into GemFire.
package com.khan.vaquar;
import java.io.Serializable;
public class Emp implements Serializable {
/**
* @author Vaquar Khan
*/
private static final long serialVersionUID = 1 L;
private int id;
private String lname;
private String fname;
private int age;
private String loc;
public Emp(int id, String lname, String fname, int age, String loc) {
this.id = id;
this.lname = lname;
this.fname = fname;
this.age = age;
this.loc = loc;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getLname() {
return lname;
}
public void setLname(String lname) {
this.lname = lname;
}
public String getFname() {
return fname;
}
public void setFname(String fname) {
this.fname = fname;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getLoc() {
return loc;
}
public void setLoc(String loc) {
this.loc = loc;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + age;
result = prime * result + ((fname == null) ? 0 : fname.hashCode());
result = prime * result + id;
result = prime * result + ((lname == null) ? 0 : lname.hashCode());
result = prime * result + ((loc == null) ? 0 : loc.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Emp other = (Emp) obj;
if (age != other.age)
return false;
if (fname == null) {
if (other.fname != null)
return false;
} else if (!fname.equals(other.fname))
return false;
if (id != other.id)
return false;
if (lname == null) {
if (other.lname != null)
return false;
} else if (!lname.equals(other.lname))
return false;
if (loc == null) {
if (other.loc != null)
return false;
} else if (!loc.equals(other.loc))
return false;
return true;
}
@Override
public String toString() {
return "Emp [id=" + id + ", lname=" + lname + ", fname=" + fname + ", age=" + age + ", loc=" + loc + "]";
}
}
- Create Region and deploy Jar
create region --name=emps --type=PARTITION
deploy --jar=<path to connector project>/emp.jar
- Spark console
import org.apache.geode.spark.connector._
import scala.util.Random
import com.khan.vaquar.Emp
val lnames = List("Smith", "Johnson", "Jones", "Miller", "Wilson", "Taylor", "Thomas", "Lee", "Green", "Parker", "Powell")
val fnames = List("John", "James", "Robert", "Paul", "George", "Kevin", "Jason", "Jerry", "Peter", "Joe", "Alice", "Sophia", "Emma", "Emily")
val locs = List("CA", "WA", "OR", "NY", "FL")
def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size))
val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray
val rdd1 = sc.parallelize(d1)
rdd1.saveToGeode("emps", e => (e.getId, e))
query --query="select * from /emps"
SparkUI
Check Spark UI for details
- Shutdown GemFire.
stop server --name=server1
stop server --name=server2
stop locator --name=locator1
If you need further information, read the connector doc in GitHub.
Conclusion
In this article, we discovered some of the framework's main concepts. GemFire-Spark connector is an extraordinarily well-done architecture: it accomplishes specific tasks using modern approaches.
References:
- https://spark.apache.org/
- https://pivotal.io/pivotal-gemfire
- https://github.com/Pivotal-Field-Engineering
Published at DZone with permission of Vaquar Khan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments