{{announcement.body}}
{{announcement.title}}

Quickstart: Apache Spark on Kubernetes

DZone 's Guide to

Quickstart: Apache Spark on Kubernetes

See how to run Apache Spark Operator on Kubernetes.

· Big Data Zone ·
Free Resource

Introduction

The Apache Spark Operator for Kubernetes

Since its launch in 2014 by Google, Kubernetes has gained a lot of popularity along with Docker itself and since 2016 has become the de facto Container Orchestrator, established as a market standard. Having cloud-managed versions available in all the major Clouds. [1] [2] [3] (including Digital Ocean and Alibaba).

With this popularity came various implementations and use-cases of the orchestrator, among them the execution of Stateful applications including databases using containers.

What would be the motivation to host an orchestrated database? That’s a great question. But let’s focus on the Spark Operator running workloads on Kubernetes.

A native Spark Operator idea came out in 2016, before that you couldn’t run Spark jobs natively except some hacky alternatives, like running Apache Zeppelin inside Kubernetes or creating your Apache Spark cluster inside Kubernetes (from the official Kubernetes organization on GitHub) referencing the Spark workers in Stand-alone mode.

However, the native execution would be far more interesting for taking advantage of Kubernetes Scheduler responsible for taking action of allocating resources, giving elasticity and an simpler interface to manage Apache Spark workloads.

Considering that, Apache Spark Operator development got attention, merged and released into Spark version 2.3.0 launched in February, 2018.

If you’re eager for reading more regarding the Apache Spark proposal, you can head to the design document published in Google Docs.

Why Kubernetes?

As companies are currently seeking to reinvent themselves through the widely spoken digital transformation in order for them to be competitive and, above all, to survive in an increasingly dynamic market, it is common to see approaches that include Big Data, Artificial Intelligence and Cloud Computing [1] [2] [3].

An interesting comparison between the benefits of using Cloud Computing instead of On-premises' servers in the context of Big Data can be read at Databricks blog, which is the company founded by the creators of Apache Spark.

As we see widespread adoption of Cloud Computing (even by companies that would be able to afford the hardware and run on-premises), we notice that most of these Cloud implementations don’t have an Apache Hadoop since the Data Teams (BI/Data Science/Analytics) increasingly choose to use tools like Google BigQuery or AWS Redshift. Therefore, it doesn’t make sense to spin-up a Hadoop with the only intention to use YARN as the resources manager.

An alternative is the use of Hadoop cluster providers such as Google DataProc or AWS EMR for the creation of ephemeral clusters. Just to name a few options.

To better understand the design of Spark Operator, the doc from GCP on GitHub is a no-brainer.

Let’s Get Hands-On!

Warming up the Engine

Now that the word has been spread, let’s get our hands on it to show the engine running. For that, let’s use:

Once the necessary tools are installed, it’s necessary to include Apache Spark path in PATH environment variable, to ease the invokation of Apache Spark executables. Simply run:

Shell

Creating the Minikube “cluster”

At last, to have a Kubernetes “cluster” we will start a minikube with the intention of running an example from Spark repository called SparkPi just as a demonstration.

Shell
 




xxxxxxxxxx
1


 
1
minikube start --cpus=2 \
2
    --memory=4g


Building the Docker image

Let’s use the Minikube Docker daemon to not depend on an external registry (and only generate Docker image layers on the VM, facilitating garbage disposal later). Minikube has a wrapper that makes our life easier:

Shell
 




xxxxxxxxxx
1


 
1
eval $(minikube docker-env)


After having the daemon environment variables configured, we need a Docker image to run the jobs. There is a shell script in the Spark repository to help with this. Considering that our PATH was properly configured, just run:

Shell
 




xxxxxxxxxx
1


 
1
docker-image-tool.sh -m -t latest build


FYI: The -m parameter here indicates a minikube build.

Let’s take the highway to execute SparkPi, using the same command that would be used for a Hadoop Spark cluster spark-submit.

However, Spark Operator supports defining jobs in the “Kubernetes language” using CRD, here are some examples - for later.

Fire in the Hole!

Mid the gap between the Scala version and .jar when you’re parameterizing with your Apache Spark version:

Shell
 




xxxxxxxxxx
1


 
1
spark-submit --master k8s://https://$(minikube ip):8443 \
2
    --deploy-mode cluster \
3
    --name spark-pi \
4
    --class org.apache.spark.examples.SparkPi \
5
    --conf spark.executor.instances=2 \
6
    --executor-memory 1024m \
7
    --conf spark.kubernetes.container.image=spark:latest \
8
    local:///opt/spark/examples/jars/spark-examples_2.11-X.Y.Z.jar # here


What’s new is:

  • --master: Accepts a prefix k8s:// in the URL, for the Kubernetes master API endpoint, exposed by the command https://$(minikube ip):8443. BTW, in case you want to know, it’s a shell command substitution;
  • --conf spark.kubernetes.container.image=: Configures the Docker image to run in Kubernetes.

Sample output:

Plain Text
 




xxxxxxxxxx
1
27


 
1
...
2
 
          
3
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: State changed,
4
new state: pod name: spark-pi-1566485909677-driver namespace: default
5
labels: spark-app-selector -> spark-20477e803e7648a59e9bcd37394f7f60,
6
spark-role -> driver pod uid: c789c4d2-27c4-45ce-ba10-539940cccb8d
7
creation time: 2019-08-22T14:58:30Z service account name: default
8
volumes: spark-local-dir-1, spark-conf-volume, default-token-tj7jn
9
node name: minikube start time: 2019-08-22T14:58:30Z container
10
images: spark:docker phase: Succeeded status:
11
[ContainerStatus(containerID=docker://e044d944d2ebee2855cd2b993c62025d
12
6406258ef247648a5902bf6ac09801cc, image=spark:docker,
13
imageID=docker://sha256:86649110778a10aa5d6997d1e3d556b35454e9657978f3
14
a87de32c21787ff82f, lastState=ContainerState(running=null,
15
terminated=null, waiting=null, additionalProperties={}),
16
name=spark-kubernetes-driver, ready=false, restartCount=0,
17
state=ContainerState(running=null,
18
terminated=ContainerStateTerminated(containerID=docker://e044d944d2ebe
19
e2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, exitCode=0,
20
finishedAt=2019-08-22T14:59:08Z, message=null, reason=Completed,
21
signal=null, startedAt=2019-08-22T14:58:32Z,
22
additionalProperties={}), waiting=null, additionalProperties={}),
23
additionalProperties={})]
24
 
          
25
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: Container final
26
statuses: Container name: spark-kubernetes-driver Container image:
27
spark:docker Container state: Terminated Exit code: 0


To see the job result (and the whole execution) we can run a kubectl logs passing the name of the driver pod as a parameter:

Shell
 




xxxxxxxxxx
1


 
1
kubectl logs $(kubectl get pods | grep 'spark-pi.*-driver')


Which brings the output (omitted some entries), similar to:

Plain Text
 




xxxxxxxxxx
1
35


 
1
...
2
19/08/22 14:59:08 INFO TaskSetManager: Finished task 1.0 in stage 0.0
3
(TID 1) in 52 ms on 172.17.0.7 (executor 1) (2/2)
4
19/08/22 14:59:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
5
tasks have all completed, from pool19/08/22 14:59:08 INFO
6
DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in
7
0.957 s
8
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at
9
SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473
10
19/08/22 14:59:08 INFO SparkUI: Stopped Spark web UI at
11
http://spark-pi-1566485909677-driver-svc.default.svc:4040
12
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend: Shutting
13
down all executors
14
19/08/22 14:59:08 INFO
15
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking
16
each executor to shut down
17
19/08/22 14:59:08 WARN ExecutorPodsWatchSnapshotSource: Kubernetes
18
client has been closed (this is expected if the application is
19
shutting down.)
20
19/08/22 14:59:08 INFO MapOutputTrackerMasterEndpoint:
21
MapOutputTrackerMasterEndpoint stopped!
22
19/08/22 14:59:08 INFO MemoryStore: MemoryStore cleared
23
19/08/22 14:59:08 INFO BlockManager: BlockManager stopped
24
19/08/22 14:59:08 INFO BlockManagerMaster: BlockManagerMaster stopped
25
19/08/22 14:59:08 INFO
26
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
27
OutputCommitCoordinator stopped!
28
19/08/22 14:59:08 INFO SparkContext: Successfully stopped SparkContext
29
19/08/22 14:59:08 INFO ShutdownHookManager: Shutdown hook called
30
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
31
/tmp/spark-aeadc6ba-36aa-4b7e-8c74-53aa48c3c9b2
32
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
33
/var/data/spark-084e8326-c8ce-4042-a2ed-75c1eb80414a/spark-ef8117bf-90
34
d0-4a0d-9cab-f36a7bb18910
35
...


The result appears in:

Plain Text
 




xxxxxxxxxx
1


 
1
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at
2
SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473


Finally, let’s delete the VM that Minikube generates, to clean up the environment (unless you want to keep playing with it):

Shell
 




xxxxxxxxxx
1


 
1
minikube delete


Last Words

I hope your curiosity got sparked and some ideas for further development have raised for your Big Data workloads. If you have any doubts or suggestions, don’t hesitate to share in the comment section.

Topics:
big data, cloud, container, data engineering, docker, hadoop, kubernetes, kubernetes operators, spark

Published at DZone with permission of Matheus Cunha . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}