DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Breaking Up a Monolithic Database with Kong
  • Building REST API Backend Easily With Ballerina Language
  • Aggregating REST APIs Calls Using Apache Camel
  • GraphQL vs REST API: Which Is Better for Your Project in 2025?

Trending

  • Key Takeaways From Integrating a RAG Application With LangSmith
  • Why We Chose Iceberg Over Delta After Evaluating Both at Scale
  • How to Test a PATCH API Request With REST-Assured Java
  • The Hidden Bottlenecks That Break Microservices in Production
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Execute Spark Applications on Databricks Using the REST API

Execute Spark Applications on Databricks Using the REST API

Let's get your Spark apps up and running on Databricks.

By 
Bipin Patwardhan user avatar
Bipin Patwardhan
·
Mar. 28, 21 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
12.2K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

While many of us are habituated to executing Spark applications using the 'spark-submit' command, with the popularity of Databricks, this seemingly easy activity is getting relegated to the background. Databricks has made it very easy to provision Spark-enabled VMs on the two most popular cloud platforms, namely AWS and Azure. A couple of weeks ago, Databricks announced their availability on GCP as well. The beauty of the Databricks platform is that they have made it very easy to become a part of their platform. While Spark application development will continue to have its challenges - depending on the problem being addressed - the Databricks platform has taken out the pain of having to establish and manage your own Spark cluster.

Using Databricks

Once registered on the platform, the Databricks platform allows us to define a cluster of one or more VMs, with configurable RAM and executor specifications. We can also define a cluster that can launch a minimum number of VMs at startup and then scale to a maximum number of VMs as required. After defining the cluster, we have to define jobs and notebooks. Notebooks contain the actual code executed on the cluster. We need to assign notebooks to jobs as the Databricks cluster executes jobs (and not Notebooks). Databricks also allows us to setup the cluster such that it can download additional JARs and/or Python packages during cluster startup. We can also upload and install our own packages (I used a Python wheel).

Recently, I developed some functionality - data reconciliation, data validation and data profiling - using Spark. Initially, we developed the functionality using the local Spark installation and things were fine. While I knew that the design would need to be reworked, we went ahead with the local implementation. Why the design change? The functionality we developed was fronted by a microservice - one for each. As the microservices were going to be deployed using Docker and Kubernetes, we would need to implement a design change for the simple reason that we could not deploy the Spark application on the Docker and Kubernetes setup. We needed to have the Spark application running on a dedicated Spark instance.

To make this happen, we had two options - Apache Livy and Databricks. For implementation flexibility and also to cater to customer infrastructure, we decided to implement both options. In an earlier article (Execute Spark Applications With Apache Livy), I have mentioned how we can execute Spark applications using Apache Livy's REST interface. I have covered the Apache Livy implementation in an earlier article.

Using Databricks Remotely

Similar to what Apache Livy has, Databricks also provides a REST API. As our implementation was in Python, we used the package databricks_api. While the REST API makes it simple to invoke a Spark application available on a Databricks cluster, I realized that all the three services ended up with the same code - the mechanism for setting up and invoking the Databricks API was the same - the names of the jobs and the parameters passed during invocation were different. Hence I wrapped up the common functionality into a helper class.

Helper class

Here is the helper class to interact with Spark applications hosted on Databricks.

Python
 




xxxxxxxxxx
1
89


 
1
. . . other imports
2
from databricks_api import DatabricksAPI
3

          
4
class DatabricksRunner:
5
    def __init__(self):
6
        self.databricks = None
7
        self.host_id = None
8
        self.access_token = None
9
        self.cluster_is_running = False
10
        self.cluster_is_defined = False
11
        self.databricks_job_id = None
12

          
13
    def get_job_id(self, job_list, name):
14
        ret_val = None
15
        jobs = job_list.get("jobs", None)
16
        if jobs is not None:
17
            for i in range(len(jobs)):
18
                job_name = jobs[i].get("settings", {}).get("name", None)
19
                if name == job_name:
20
                    ret_val = jobs[i]
21
                    break
22
        return ret_val
23

          
24
    def connect_to_cluster(self, host_id, access_token, cluster_name, headers):
25
    self.host_id = host_id
26
    self.access_token = access_token
27
    self.databricks = DatabricksAPI(host=host_id, token=access_token)
28

          
29
    clusters = self.databricks.cluster.list_clusters(headers=headers)
30
    self.cluster_is_running = False
31
    self.cluster_is_defined = False
32
    for c in clusters["clusters"]:
33
        if c["cluster_name"] == cluster_name and self.cluster_is_defined == False:
34
            self.cluster_is_defined = True
35
        if c["state"] == "RUNNING":
36
            self.cluster_is_running = True
37
        elif c["stat"] == "TERMINATED":
38
            self.cluster_is_running = False
39
        else:
40
            self.cluster_is_running = False
41

          
42
        if self.cluster_is_defined is False:
43
            return False
44

          
45
        if self.cluster_is_running is False:
46
            return False
47
    return True
48

          
49
    def get_job_id(self, job_name):
50
        if self.cluster_is_running == False:
51
            return None
52

          
53
        job_list = self.databricks.jobs.list_jobs(headers=None)
54
        job_details = self.get_job_id(job_list, job_name)
55
        if job_details is None:
56
            return None
57
        self.databricks_job_id = job_details["job_id"]
58
        return True
59

          
60
    def run_job(self, notebook_params, jar_params, python_params, spark_submit_params, headers):
61
        ret_val = self.databricks.jobs.run_now(job_id=self.databricks_job_id,     jar_prams=jar_params,
62
            notebook_params=notebook_params, python_params=python_params,
63
            spark_submit_params=spark_submit_params, headers=headers)
64
        self.job_run_id = ret_val["run_id"]
65
        return True
66

          
67
    def wait_for_job(self, poll_time, timeout_value):
68
        life_cycle_state = "RUNNING"
69
        run_time = 0
70
        result_state = ""
71
        while "TERMINATED" != life_cycle_state and run_time <= timeout_value:
72
            run_response = self.databricks.jobs.get_run(run_id=self.job_run_id, headers=None)
73
            state = run_response.get("state", None)
74
            if state is not None:
75
                life_cycle_state = state.get("life_cycle_state", None)
76
                result_state = state.get("result_state", None)
77
            time.sleep(poll_time)
78
            run_time = run_time + poll_time
79

          
80
        if run_time >= timeout_value:
81
            return "job is still running"
82
        elif "TERMINATED" == life_cycle_state:
83
            if result_state == "success":
84
                ret_val = "success"
85
            elif result_state == "failed":
86
                ret_val = "failed"
87
            else:
88
                ret_val = "state not recognized"
89
    return ret_val



Using the Helper Class

After defining the class, we can run Spark jobs as below

Python
 




x


 
1
host_id = # assign value
2
access_token = # assign value
3
cluster_name = # assign value
4
headers = # assign value
5
job_name = # assign value
6
notebook_params = # json
7
python_params = # json
8
spark_submit_params = # json
9

          
10
runner = DatabricksRunner()
11
ret_val = runner.connect_to_cluster(host_id, access_token, cluster_name, headers)
12
ret_val = runner.get_job_id(job_name)
13
ret_val = runner.run_job(notebook_params, jar_params, python_params, spark_submit_params, headers)
14
ret_val = runner.wait_for_job(60, 600)



Conclusion

The Databricks API makes it easy to interact with Databricks jobs remotely. Not only can we run jobs on the Databricks cluster, but we can also monitor their execution state.

API REST Web Protocols application Docker (software)

Opinions expressed by DZone contributors are their own.

Related

  • Breaking Up a Monolithic Database with Kong
  • Building REST API Backend Easily With Ballerina Language
  • Aggregating REST APIs Calls Using Apache Camel
  • GraphQL vs REST API: Which Is Better for Your Project in 2025?

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook