Build a Data Analytics Platform With Flask, SQL, and Redis
Building a Flask-based web app that has dynamic querying for population thresholds, Redis caching for faster queries, and secure, scalable architecture.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I’ll walk through the development of a Flask-based web application that interacts with an SQL Server database to analyze population data. The application allows users to query population ranges, fetch counties by state, and retrieve states within specific population ranges. I shall also discuss how to integrate Redis for caching query results to improve performance.
Why Flask, SQL Server, and Redis?
Flask is a lightweight and flexible Python web framework that is perfect for building small to medium-sized web applications. It provides the necessary tools to create RESTful APIs, render dynamic HTML templates, and interact with databases. On the other hand, SQL Server is a robust relational database management system (RDBMS) that is widely used in enterprise applications. Combining Flask with SQL Server allows us to build a powerful application for data analysis and visualization.
To further enhance performance, we’ll integrate Redis, an in-memory data store, to cache frequently accessed query results. This reduces the load on the database and speeds up response times for repeated queries.
Application Overview
Our Flask application performs the following tasks:
- Query population ranges. Users can specify a year and population range to get counts of states falling within those ranges.
- Fetch counties by state. Users can input a state code to retrieve a list of counties.
- Retrieve states by population range. Users can specify a population range and year to get a list of states within that range.
- Note. To test, feel free to create your own schema in the database and insert sample data as needed based on the following APIs shared using SQL queries. Also, the HTML pages that are used here can be basic table design that grabs the returned data from the Flask app code and display the results.
Let’s dive into the implementation details.
Setting Up the Flask Application
1. Prerequisites
Before starting, ensure you have the following installed through your terminal root (commands compatible with MacOS):
- Python 3.x
- Flask (
pip install flask
) - SQLAlchemy (
pip install sqlalchemy
) - PyODBC (
pip install pyodbc
) - Redis (
pip install redis
)
2. Database Connection
We use SQLAlchemy to connect to the SQL Server database. Here’s how the connection can be configured:
from sqlalchemy import create_engine
import urllib
# SQL Server connection string
params = urllib.parse.quote_plus(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=tcp:username.database.windows.net,1433;"
"Database=population;"
"Uid=user@username;"
"Pwd={azure@123};"
"Encrypt=yes;"
"TrustServerCertificate=no;"
"Connection Timeout=30;"
)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
This connection string uses the ODBC Driver for SQL Server and includes parameters for encryption and timeout.
3. Redis Configuration
Redis is used to cache query results. Here’s how to set up the Redis connection:
import redis
# Redis connection
redis_client = redis.StrictRedis(
host='username.redis.cache.windows.net',
port=6380,
db=0,
password='encryptedpasswordstring',
ssl=True
)
4. Implementing the Application Routes
Home Page Route
The home page route renders the main page of the application:
@app.route('/')
def index():
return render_template('index.html')
Population Range Query With Redis Caching
This route handles queries for population ranges. It first checks if the result is cached in Redis. If not, it queries the database and caches the result for future use:
@app.route('/population-range', methods=['GET', 'POST'])
def population_range():
if request.method == 'POST': # input params defined for this api
year = request.form['yr1']
range1_start = request.form['r1']
range1_end = request.form['r2']
range2_start = request.form['r3']
range2_end = request.form['r4']
range3_start = request.form['r5']
range3_end = request.form['r6']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"population_range_{year_column}_{range1_start}_{range1_end}_{range2_start}_{range2_end}_{range3_start}_{range3_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT
SUM(CASE WHEN {year_column} BETWEEN '{range1_start}' AND '{range1_end}' THEN 1 ELSE 0 END) AS range1_count,
SUM(CASE WHEN {year_column} BETWEEN '{range2_start}' AND '{range2_end}' THEN 1 ELSE 0 END) AS range2_count,
SUM(CASE WHEN {year_column} BETWEEN '{range3_start}' AND '{range3_end}' THEN 1 ELSE 0 END) AS range3_count
FROM popul
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('display.html', data1=result, t1=time_taken, cache_status=cache_status)
return render_template('index.html')
Fetch Counties by State With Redis Caching
This route retrieves counties for a given state code. It also uses Redis to cache the results:
@app.route('/counties-by-state', methods=['GET', 'POST'])
def counties_by_state():
if request.method == 'POST':
state_code = request.form['state_code']
# Build cache key
cache_key = f"counties_by_state_{state_code}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT county
FROM dbo.county
WHERE state = (SELECT state FROM codes WHERE code = '{state_code}')
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('counties.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
Retrieve States by Population Range With Redis Caching
This route fetches states within a specified population range and caches the results:
@app.route('/states-by-population', methods=['GET', 'POST'])
def states_by_population():
if request.method == 'POST':
year = request.form['year']
population_start = request.form['population_start']
population_end = request.form['population_end']
# Map year to column name
year_map = {
'2010': 'ten',
'2011': 'eleven',
'2012': 'twelve',
'2013': 'thirteen',
'2014': 'fourteen',
'2015': 'fifteen',
'2016': 'sixteen',
'2017': 'seventeen',
'2018': 'eighteen'
}
year_column = year_map.get(year, 'ten') # Default to 'ten' if year not found
# Build cache key
cache_key = f"states_by_population_{year_column}_{population_start}_{population_end}"
# Check if result is cached
cached_result = redis_client.get(cache_key)
if cached_result:
result = eval(cached_result) # Deserialize cached result
time_taken = 0 # No database query, so time taken is negligible
cache_status = "Cache Hit"
else:
# Build SQL query
query = f"""
SELECT state
FROM popul
WHERE {year_column} BETWEEN '{population_start}' AND '{population_end}'
"""
print(query) # For debugging
# Execute query and measure time
start_time = time()
result = engine.execute(query).fetchall()
end_time = time()
time_taken = end_time - start_time
cache_status = "Cache Miss"
# Cache the result
redis_client.set(cache_key, str(result), ex=3600) # Cache for 1 hour
return render_template('states.html', data=result, time_taken=time_taken, cache_status=cache_status)
return render_template('index.html')
Performance Comparison: SQL Server vs. Redis
Query Type | Redis Fetch Time | SQL Execution Time |
---|---|---|
Population Range Query (Cached) | 0.002 seconds | 0.000 seconds |
Population Range Query (Fresh) | 0.002 seconds | 1.342 seconds |
Key takeaway: Redis reduces execution time from ~1.3 seconds to ~0.002 seconds, making queries 650x faster!
How Redis Improves Performance
Redis is an in-memory data store that acts as a caching layer between the application and the database. Here’s how it works in our application:
- Cache key. A unique key is generated for each query based on its parameters.
- Cache check. Before executing a database query, the application checks if the result is already cached in Redis.
- Cache hit. If the result is found in Redis, it is returned immediately, avoiding a database query.
- Cache miss. If the result is not found, the query is executed, and the result is cached in Redis for future use.
- Cache expiry. Cached results are set to expire after a specified time (e.g., 1 hour) to ensure data freshness.
By caching frequently accessed query results, Redis significantly reduces the load on the database and improves response times for repeated queries.
Conclusion
In this article, we built a Flask application that interacts with a SQL Server database to analyze population data. We integrated Redis to cache query results, improving performance and reducing database load. By following best practices, you can extend this application to handle more complex queries and scale it for production use.
Link: The source code of this full application can be found on GitHub.
Opinions expressed by DZone contributors are their own.
Comments