DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Python Bags the TIOBE Language of the Year 2021 in a Row
  • The Magic of Apache Spark in Java
  • Apache Spark for the Impatient
  • How Doris Connects to Multiple Databases for Federated Analytics and Data Migration

Trending

  • Web Crawling for RAG With Crawl4AI
  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques
  • The Ultimate Guide to Code Formatting: Prettier vs ESLint vs Biome
  • Introduction to Retrieval Augmented Generation (RAG)
  1. DZone
  2. Data Engineering
  3. Data
  4. Augmented Analytics With PySpark and Sentiment Analysis

Augmented Analytics With PySpark and Sentiment Analysis

Step by step tutorial of how you can leverage sentiment analysis to enrich tweets data with PySpark and get a feel of the overall sentiments towards COVID19.

By 
Adi Polak user avatar
Adi Polak
·
Updated Oct. 20, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
8.9K Views

Join the DZone community and get the full member experience.

Join For Free

In this tutorial, you will learn how to enrich COVID19 tweets data with a positive sentiment score.You will leverage PySpark and Cognitive Services and learn about Augmented Analytics.

What Is Augmented Analytics?

According to Gartner's report, augmented analytics is the use of technologies such as machine learning and AI to assist with data preparation, insight generation. Its main goal is to help more people to get value out of data and generate insights in an easy, conversational manner. For our example, we extract the positive sentiment score out of a tweet to help in understanding the overall sentiment towards COVID-19.

What Is PySpark?

PySpark is the framework we use to work with Apache Spark and Python. Learn more about it here.

What Is Sentiment Analysis?

Sentiment Analysis is part of NLP - natural language processing usage that combined text analytics, computation linguistics, and more to systematically study affective states and subjective information, such as tweets. In our example, we will see how we can extract positive sentiment score out of COVID-19 tweets text. In this tutorial, you are going to leverage Azure Cognitive Service, which gives us Sentiment Analysis capabilities out of the box. When working with it, we can leverage the TextAnalyticsClient client library or leverage REST API. Today, you will use the REST API as it gives us more flexibility.

Prerequisites

  • Apache Spark environment with notebooks, it can be Databricks, or you can start a local environment with docker by running the next command: docker run -it -p 8888:8888 jupyter/pyspark-notebook
  • Azure free account
  • Download Kaggle COVID-19 Tweet data
  • Cognitive Services free account (check out the picture below )

Sentiment for Tweets package

Step by Step Tutorial — Full Data Pipeline:

In this step by step tutorial, you will learn how to load the data with PySpark, create a user define a function to connect to Sentiment Analytics API, add the sentiment data and save everything to the Parquet format files. 

You now need to extract upload the data to your Apache Spark environment, rather it's Databricks or PySpark jupyter notebook. For Databricks use this, for juypter use this.

For both cases, you will need the file_location = "/FileStore/tables/covid19_tweets.csv" make sure to keep a note of it.

Loading the Data With PySpark

This is how you load the data to PySpark DataFrame object, spark will try to infer the schema directly from the CSV. One of the things you will notice is that when working with CSV and infer a schema, Spark often refers to most columns as String format.

Python
 




xxxxxxxxxx
1


 
1
inputDF = spark.read.\
2

          
3
        format("com.databricks.spark.csv").\
4

          
5
        option("header", "true").\
6

          
7
        option("inferSchema", "true").load("/FileStore/tables/covid19_tweets.csv")



Provide More Accurate Schema to Our Data:

In here you define the expectedSchema and later cast the data to match it. You will use StructType and StructField which are Spark SQL DataTypes that help you with defining the schema.

withColumn functionality creates a new DataFrame with the desired column according to the name and value you provide it with.

Python
 




xxxxxxxxxx
1
19


 
1
from pyspark.sql.types import *
2
from pyspark.sql.functions import *
3

          
4
# create expected schema
5
expectedSchema = StructType([
6
  StructField("user_name", StringType(), True),
7
  StructField("user_location", StringType(), True),
8
  StructField("user_description", StringType(), True),
9
  StructField("user_created", StringType(), True),
10
  StructField("user_followers", FloatType(), True),
11
  StructField("user_friends", FloatType(), True),
12
  StructField("user_favourites", FloatType(), True),
13
  StructField("user_verified", BooleanType(), True),
14
  StructField("date", StringType(), True),
15
  StructField("text", StringType(), True),
16
  StructField("hashtags", StringType(), True),
17
  StructField("source", StringType(), True),
18
  StructField("is_retweet", BooleanType(), True)
19
])



Now, let's create your new DataFrame with the right schema!

Notice that you assign the new schema to inputDF, which means you will no longer have access to the old DataFrame.

Python
 




xxxxxxxxxx
1
16


 
1
# Set data types - cast the data in columns to match schema
2

          
3
inputDF = inputDF \
4
  .withColumn("user_name", inputDF["user_name"].cast("string")) \
5
  .withColumn("user_location", inputDF["user_location"].cast("string")) \
6
  .withColumn("user_description", inputDF["user_description"].cast("string")) \
7
  .withColumn("user_created", inputDF["user_created"].cast("string")) \
8
  .withColumn("user_followers", inputDF["user_followers"].cast("float")) \
9
  .withColumn("user_friends", inputDF["user_friends"].cast("float")) \
10
  .withColumn("user_favourites", inputDF["user_favourites"].cast("float")) \
11
  .withColumn("user_verified", inputDF["user_verified"].cast("boolean")) \
12
  .withColumn("date", inputDF["date"].cast("string")) \
13
  .withColumn("text", inputDF["text"].cast("string")) \
14
  .withColumn("hashtags", inputDF["hashtags"].cast("string")) \
15
  .withColumn("source", inputDF["source"].cast("string")) \
16
  .withColumn("is_retweet", inputDF["is_retweet"].cast("boolean")) \



Connect to Sentiment Analysis With REST API

For connecting and consuming sentiment analysis services, we need to provide the sentiment analysis endpoint and access key. Both can be found in the .

Finding the endpoint, it can be from the Overview section or from Keys and Endpoints.

Copying the endpoint

Finding the access key:

Copying API key

After finding the key and endpoint, for production and working in a team, you need to store them in someplace safe, try providing saving keys in free text in code, this is not safe. You might end up with hackers mining your cloud environment for bitcoins. 

For Databricks, you can leverage dbutils.secrets functionality. This is how to set it up.

If you work locally with juypter PySpark notebook, you can use plain-text, but remember to remove it when you commit your code to a git repo.

This is how to work with dbutils, providing it the scope and key name.

In this code snippet, the scope is named - mle2ebigdatakv and the name for the key is sentimentEndpoint and sentimentAccessKeys. 

Python
 




xxxxxxxxxx
1


 
1
# provide endpoint and key 
2
sentimentEndpoint = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentEndpoint")
3
sentimentAccessKeys = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentAccessKeys")



Let's build the connections itself, sentiment analysis expects to receive a document like an object, for that you will work with python dictionary and will build a doc request with ID. The ID has to be unique for every request.

Notice here the language_api_url variable, this is where you are constructing the request for Cognitive Analysis, asking for text analytics sentiment with version 3.0.

Python
 




xxxxxxxxxx
1
14


 
1
import requests
2
# build the rest API request with language_api_url
3
language_api_url = sentimentEndpoint + "/text/analytics/v3.0/sentiment"
4
headers = {"Ocp-Apim-Subscription-Key": sentimentAccessKeys}
5

          
6
def constractDocRequest(text):
7
  docRequest = {}
8
  doc = {}
9
  doc["id"]= text
10
  doc["text"]= text
11
  docRequest["documents"] = [doc]
12
  return docRequest


 

Try running it with some text, you will see that the response is consistent of score sentiment for positive, netural and negative.

This is how a response is structured:

JSON
 




xxxxxxxxxx
1
29


 
1
{
2
    "documents": [
3
        {
4
            "id": "1",
5
            "sentiment": "positive",
6
            "confidenceScores": {
7
                "positive": 1.0,
8
                "neutral": 0.0,
9
                "negative": 0.0
10
            },
11
            "sentences": [
12
                {
13
                    "sentiment": "positive",
14
                    "confidenceScores": {
15
                        "positive": 1.0,
16
                        "neutral": 0.0,
17
                        "negative": 0.0
18
                    },
19
                    "offset": 0,
20
                    "length": 66,
21
                    "text": "covid19 is not scary at all, it't actualy an oppurtiniry to thrive"
22
                }
23
            ],
24
            "warnings": []
25
        }
26
    ],
27
    "errors": [],
28
    "modelVersion": "2020-04-01"
29
}



Let's build a python functionality to extract sentiment and register the function with PySpark through the UDF(user-defined function) api.

You need to make sure you are actually getting a document back from the REST API, and also secure your functionality from sending an empty text to the sentiment analysis service, as it will resolve in an error.

This is how you connect everything together:

Python
 




x


 
1
from pyspark.sql.functions import udf
2
3
# extract the sentiment out of the returned json doc
4
def extractSentiment(doc,sentimentType):
5
  if doc == {} or not 'documents' in doc:
6
    return 0.0
7
  return float(doc['documents'][0]['confidenceScores'][sentimentType])
8
9
#function for extracting the positive sentiment 
10
def getPositiveSentiment(text):
11
  if bool(text.strip()) == False:
12
    return 0.0
13
  positive = extractSentiment(constructDocRequest(text),'positive')
14
  return positive
15
16
# creating the udf function pointer
17
get_positive_sentiment = udf(getPositiveSentiment, StringType())
18
19
# create a new DF with new column represetning positive sentiment score
20
enrichedDF_positiveSentiment = inputDF.withColumn('positive_sentiment', get_positive_sentiment(inputStream["text"]))



After enriching your data, it's important to save it to storage for future needs, you will save it to parquet format which will keep the schema intact. Apache Parquet format is designed to bring efficient columnar storage of data compared to row-based files such as CSV since it allows better compression and faster scanning of a subset of columns later on. Vs CSV, where we have to read the whole file and the columns to query only a subset of them.

This is how it's done with PySpark:

Python
 




xxxxxxxxxx
1


 
1
# Stream processed data to parquet for the Data Science to explore and build ML models
2
enrichedDF_poisitveSentiment.write \
3
  .format("parquet") \
4
  .save("/DATALAKE/COVID19_TWEETS/REFINED/WITH_SENTIMENT/")



You can decide on the name and structure of your data, but make sure to point out that this data now contained a new column with positive sentiment.

This data can later be used in various data visualization tools or for researchers.

Issues That May Occur

JSON
 




xxxxxxxxxx
1


 
1
{'error': {'code': 'InvalidRequest',           
2
           'innererror': {'code': 'EmptyRequest',                                                           'message': 'Request body must be present.'
3
                         },           
4
           'message': 'Invalid Request.'}
5
}



Pay notice to the error message, when you see this kind of error, it might be that you are out of quote with Cognitive Services. If you learning about the service and trying it out, it's better to use a few samples and not the whole datasets, as you might run out of quota in the free tier as it's good for up to 5K transactions.

That's it! 

This tutorial walked you through how to leverage existing REST API services to enrich your data for future work and augmented analytics. 

To learn more, check out the GitHub repo. Happy to take your questions and follow up with you on twitter. 

Sentiment analysis Apache Spark pyspark Analytics Data visualization Machine learning Python (language) Database sql NLP

Opinions expressed by DZone contributors are their own.

Related

  • Python Bags the TIOBE Language of the Year 2021 in a Row
  • The Magic of Apache Spark in Java
  • Apache Spark for the Impatient
  • How Doris Connects to Multiple Databases for Federated Analytics and Data Migration

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!