Python Asyncio Tutorial: A Complete Guide
Learn to optimize test execution time and manage coroutines for efficient concurrency with Python asyncio, ideal for developers looking to streamline workflows.
Join the DZone community and get the full member experience.
Join For FreeTest execution time plays a key role in speeding up releases, especially when testing at scale. It largely depends on how well the test suites are designed, their ability to run tests concurrently, and the efficiency of the test infrastructure used.
When fetching build details for the last 500 tests, large API responses can delay execution. To avoid blocking other tasks, running API calls asynchronously is ideal, improving overall test efficiency. In this Python asyncio tutorial, we will dive deep into the nuances of asynchronous programming with Python using the asyncio
(asynchronous I/O) library that was introduced in Python 3.4. The learnings of this Python asyncio tutorial will help you make the most of coroutines, tasks, and event loops for realizing concurrent execution.
Note: Async IO, AsyncIO, and asyncio are used interchangeably throughout this Python asyncio tutorial.
What Is Asynchronous Programming in Python?
As the name indicates, asynchronous programming is an approach where different tasks can be executed concurrently. This essentially means that the main
(or single) thread need not be blocked when other tasks are performing I/O operations, making HTTP requests, and more.
As seen in the image representation above, tasks waiting for I/O operations or network requests do not block the other tasks, thereby minimizing the idle time and reducing the overall execution time. The Python asyncio
library allows concurrency by using coroutines that run in an event loop, which is itself executed in a single thread. As stated in the official documentation of asyncio, the implementation of this library, which was previously called Tulip, has now been a part of the Python standard library since Python 3.4. If you are running a Python version earlier than 3.4 (which is not recommended), you can install the Python asyncio
library by triggering the pip install asyncio
command on the terminal.
Here are some of the use cases where asynchronous execution in Python asyncio can be highly beneficial:
- Web applications (e.g., streaming, e-commerce, etc.) that need to handle a large number of simultaneous requests
- Every web application that uses REST APIs that involve I/O operations (e.g., handling HTTP requests and responses)
- Web applications using the Microservices architecture, where asynchronous execution can help accelerate handling network calls, interacting with databases, and more.
In further sections of the Python asyncio tutorial, we will be deep-diving into the core concepts of the Python asyncio
library, i.e., coroutines, event loops, tasks, and async
/await
syntax.
Essentials of Python Asyncio
The async
and await
keywords form the fundamentals of asynchronous programming in Python via the Python asyncio
library. With Python asyncio, a normal function (i.e., def function_name
) becomes an asynchronous (or a coroutine) function using the async
keyword (i.e., async def function_name
). The async def
change lets the current function temporarily pause its execution while the execution of respective (e.g., I/O, network requests, etc.) operations is in progress.
The control is yield
to the event loop when a coroutine (or task) encounters the await
keyword, or a coroutine awaits another coroutine or future. Let’s look at each of the components of the Python asyncio
library in more detail.
Coroutines
In the context of the Python asyncio
library, coroutines can be defined as functions that provide the flexibility to temporarily pause the execution of waiting tasks. This lets the other tasks execute concurrently while waiting for the completion of blocking time-consuming operations like I/O, network requests, file operations, database operations, etc. In simple terms, the CPU is less utilized (or might be free) when I/O (or similar operations) are in progress.
For instance, copying data to an external hard drive is an I/O operation where the CPU only initiates and accepts the I/O requests. The CPU can be better utilized in such cases to perform other tasks. The same rationale also applies to coroutines in the Python asyncio
library. A normal function in Python becomes a coroutine when it is defined with the async def
syntax. Upon the usage of async def
, the said function yields a coroutine object. When the await
keyword is encountered, the current coroutine is paused, and the control is yielded back to the event loop. The event loop continuously monitors the awaitable (e.g., coroutine, Task, a Future) until completion.
Once the execution of the awaitable or the newly picked-up task is complete, the event loop restores the execution of the paused coroutine. It is important to note that coroutines do not make the code multi-threaded; rather, coroutines run in an event loop that executes in a single thread. Shown below is an example showcasing the usage of coroutine in Python:
import asyncio
import sys
import time
from datetime import datetime
async def test_1():
# Get function name
# https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
# Could be an I/O operation, network request, database operation, and more
await asyncio.sleep(2)
ret_info = await test_2()
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "test_1"
async def test_2():
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
await asyncio.sleep(2)
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "test_2"
async def main():
print("Enter main")
start_time = time.perf_counter()
# Execution is paused since the await keyword is encountered.
# Control is yield back to the event loop and other coroutine (if any) is executed
# Control is handed back to test_1 once the sleep of 2 seconds is completed
ret_info = await test_1()
print(f"Data received from the test_1: {ret_info}" + " " + str(datetime.now().time()))
ret_info = await test_2()
print(f"Data received from the test_2: {ret_info}" + " " + str(datetime.now().time()))
end_time = time.perf_counter()
print("Exit main")
print(f'It took {round(end_time - start_time,0)} second(s) to complete.')
if __name__ == '__main__':
# Run the main coroutine
asyncio.run(main())
The test_1()
and test_2()
are defined as asynchronous functions (or coroutines). During the execution of test_1()
, an await keyword is encountered with an async sleep of 2 seconds. This pauses the coroutine and yields control back to the event loop. With this, the execution of test_1()
is paused until the completion of the test_2()
coroutine. Post the execution of test_2()
, the execution of test_1()
coroutine is resumed, and the return value of test_1
is printed on the terminal.
Lastly, the asyncio.run()
is used to run the main coroutine until its completion. asyncio.run()
also sets up the event loop, executes the coroutine, and closes the event loop when the main
finishes.
Event Loop
Event loop in the Python asyncio
library primarily manages the scheduling of asynchronous tasks, callbacks, I/O operations, and more. As stated earlier, the event loop manages and schedules asynchronous operations without blocking the main
thread. Since the event loop continuously runs, it monitors the awaitable (e.g., coroutine, task, a future) until its execution is complete. As soon as the await
keyword is encountered, the current coroutine is temporarily paused, and the control is yielded to the event loop.
Once the execution of the awaited task (or awaitable) is complete, the event loop resumes the execution of the paused routine. In a nutshell, the event loop in the Python asyncio
library plays a pivotal role in catalyzing the asynchronous (or concurrent) execution of tasks. Shown below is an example showcasing an event loop in Python:
import asyncio
import sys
import time
from datetime import datetime
async def test_1():
# Get function name
# https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
# Could be an I/O operation, network request, database operation, and more
await asyncio.sleep(2)
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "test_1"
async def test_2():
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
# Sleep of 2 seconds
await asyncio.sleep(2)
print("Exit " + sys._getframe().f_code.co_name)
return "test_2"
async def main():
print("Enter main")
start_time = time.perf_counter()
# Await test_1
ret_info = await test_1()
print(f"Data received from the test_1: {ret_info}" + " " + str(datetime.now().time()))
# Await test_2
ret_info = await test_2()
print(f"Data received from the test_2: {ret_info}" + " " + str(datetime.now().time()))
print("Exit main")
end_time = time.perf_counter()
print(f'It took {round(end_time - start_time,0)} second(s) to complete.')
if __name__ == '__main__':
# Explicitly create a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()
A new event loop object is created using the new_event_loop()
method of the Python asyncio library. As stated in this Stack Overflow Question, a new event loop creation is required if the event loop needs to run out of the main thread or a custom policy needs to be used in a single application. This set_event_loop(loop)
sets the newly created loop (named loop
) as the current event loop. This ensures that the get_event_loop()
method returns this loop.
In case you encounter the DeprecationWarning: There is no current event loop
warning, we would suggest going through this Stack Overflow thread for more information. Shown below is the execution of the code, which indicates that the execution of the test_1()
coroutine was paused when the await
keyword was encountered in the code. Post the async sleep of 2 seconds, the second coroutine test_2()
is executed until its completion. The total execution time is 4 seconds.
The close()
method of the Python asyncio
library closes the event loop (named loop) created earlier once all the tasks are completed. Next, we see how we can further leverage the benefits of tasks in Python asyncio for running tasks at the event loop at the same time.
Tasks
So far, we have seen that the await
keyword is used to suspend the execution of the current coroutine until the execution of the awaitable (could be coroutines, tasks, or futures) is complete. Hence, it is used for cooperative multitasking, whereby multiple coroutines can run parallel in a single-thread environment. However, tasks are a more efficient way of managing the concurrent execution of coroutines.
A task is a wrapper around a coroutine that is scheduled for execution by the event loop. Though both sound very similar, there is a thin line of difference when you opt for tasks instead of merely using the await
keyword.
As seen in the earlier example for Event Loop, the current coroutine (e.g., test_1()
) yields the control back to the event loop once the await keyword is encountered in the coroutine. As soon as this occurs, the current coroutine is temporarily paused (or suspended), and the event loop picks up the next available coroutine/Task/Future (e.g., test_2()
) for execution. Once the execution of the awaited coroutine (i.e., test_2()
) is complete, the suspended coroutine (i.e., test_1()
) is resumed for execution.
However, tasks let you manage concurrent execution of the coroutines more efficiently. Unlike coroutines that are temporarily paused using await
, tasks can start coroutines without waiting for them to complete. Tasks in the Python asyncio
library let you run/schedule multiple coroutines concurrently on a single thread.
Let’s consider the earlier example where we have two coroutines: test_1()
and test_2()
. When converted into tasks, both tasks can run concurrently instead of one waiting for the other to complete execution. In a nutshell, tasks wrapping coroutines test_1()
and test_2()
not only run in tandem, but the event loop can also utilize the CPU more efficiently.
Coroutines can be wrapped into a task by invoking the create_task()
method, which returns a Task object. As stated in the official documentation, the wait_for()
method of the Python asyncio
library waits for the single Future (or coroutine) to complete with a timeout.
On a lighter note, tasks and await
in the Python asyncio
library are two sides of the same coin. With await
, control is transferred back to the event loop that schedules the next awaitable to be run. Though it does provide concurrency, there is still massive room for performance improvement.
This is where tasks come into the picture, as this wrapper around a coroutine lets you concurrently run multiple coroutines. The usage of tasks ups the overall efficiency of the Event Loop, thereby improving the performance and responsiveness of the code. The example shown in the Event Loop section is ported in a manner where normal coroutines are converted into Tasks.
import asyncio
import sys
import time
from datetime import datetime
async def test_1():
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
await asyncio.sleep(2)
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "test_1"
async def test_2():
print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
await asyncio.sleep(2)
print("Exit " + sys._getframe().f_code.co_name)
return "test_2"
async def main():
print("Enter main")
start_time = time.perf_counter()
# Create tasks for concurrent execution
task1 = asyncio.create_task(test_1())
task2 = asyncio.create_task(test_2())
# Await both tasks
ret_info_1 = await task1
print(f"Data received from test_1: {ret_info_1} " + str(datetime.now().time()))
ret_info_2 = await task2
print(f"Data received from test_2: {ret_info_2} " + str(datetime.now().time()))
print("Exit main")
end_time = time.perf_counter()
print(f'It took {round(end_time - start_time, 0)} second(s) to complete.')
if __name__ == '__main__':
# Run the main coroutine
asyncio.run(main())
There are two simple coroutines test_1()
and test_2()
, where a sleep of 2 seconds is added in each of them. In main()
, the two coroutines are wrapped as tasks by invoking the create_task(co_name)
method of the Python asyncio
library. Now that the tasks are created, the await
keyword (on the task wrapping test_1()
) pauses the current coroutine and schedules the other task (wrapping test_2()
).Both tasks are scheduled to run instantly on the event loop at the same time. The event loop can schedule other tasks during the sleep period. The execution time is 2 seconds (which was earlier 4 seconds) since task2
runs concurrently with task1
.
Tasks should be prioritized over normal coroutines if there is no dependency (e.g., result of one coroutine used in the other one) between coroutines. Invoking multiple API requests (e.g., LambdaTest APIs) and clubbing the results, performing parallel I/O operations, logging data, and running background tasks are some of the scenarios where tasks should be preferred over coroutines. On the whole, concurrent execution of tasks reduces the overall execution time along with improving the efficiency of the code.
Running Tasks Concurrently Using asyncio.gather()
There could be scenarios where you would want a series of awaitables (e.g., tasks) to be executed concurrently, with the result being an aggregate list of the returned values. One such example is testing multiple API endpoints in a single go where the API response is aggregated in a list. Similarly, batch processing (i.e., extracting and processing data concurrently) is much more efficient using the asyncio.gather()
method. As stated in the official documentation, asyncio.gather()
lets you run awaitable objects in the aws
sequence concurrently.
awaitable asyncio.gather(*aws, return_exceptions=False)
All coroutines (i.e., aws
) are automatically converted into tasks. The coroutines passed to the gather()
method are executed concurrently, and the results are retrieved after the completion of all the coroutines. In case two coroutines (e.g., coroutine_1()
and coroutine_2()
) are passed to the gather()
method, they are first converted into tasks and executed concurrently. The return value is a list of execution results in the order of the original sequence, not necessarily the order of results arrival. return_exceptions
parameter, which is False by default, lets you tweak the manner in which raised exceptions are handled during the gather operation. We will cover return_exceptions
in more detail in the further sections of this Python asyncio tutorial. Shown below is a port of the create_task()
example which we demoed earlier. The two coroutines, coroutine_1()
and coroutine_1()
are inherently converted into tasks by the asyncio.gather()
method.
import asyncio
import sys
import time
from datetime import datetime
async def coroutine_1():
print("Enter asyncio.gather " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
await asyncio.sleep(2)
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "coroutine_1"
async def coroutine_2():
print("Enter asyncio.gather " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
await asyncio.sleep(2)
print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
return "coroutine_2"
async def main():
print("Enter main")
start_time = time.perf_counter()
# Use asyncio.gather to run test_1 and test_2 concurrently
ret_info_1, ret_info_2 = await asyncio.gather(
coroutine_1(),
coroutine_2(),
return_exceptions = True
)
print(f"[asyncio.gather] Data received from coroutine_1: {ret_info_1} " + str(datetime.now().time()))
print(f"[asyncio.gather] Data received from coroutine_2: {ret_info_2} " + str(datetime.now().time()))
print("Exit main")
end_time = time.perf_counter()
print(f'It took {round(end_time - start_time, 0)} second(s) to complete.')
if __name__ == '__main__':
asyncio.run(main())
The coroutines/tasks are scheduled and run concurrently by the gather()
method of the Python asyncio
library. The return value of the task(s) execution is captured in variables ret_info_1
and ret_info_2
, respectively. Apart from gathering tasks, the rest of the execution and test logic remains unchanged. The total execution time is 2 seconds even though the execution of both the concurrently-running coroutines was paused for 2 seconds using the asyncio.sleep()
method.
To summarize, gather()
in Python, asyncio helps improve performance by reducing wait times, provides robust error handling, and improves result collection in a list.
Handling Exceptions in asyncio.gather()
Exceptions could occur during the execution; either it could be an exception raised in one or more tasks, or it could be a task(s) that was canceled, raising the CancelledError
exception. The manner in which exceptions are handled is controlled by the return_exceptions
parameter in gather()
. By default, return_exceptions
is False, hence, any raised exception is propagated to the task that awaits on the gather()
.
In the snippet below, a list consisting of four coroutines is passed to gather()
. Since return_exception
s is set to False, exceptions raised by any coroutine/task would be propagated to the next one, resulting in a cancellation of the said tasks (and other future awaitables). In case an exception is raised in coroutine_1
, it will be propagated to other tasks, thereby canceling all of them (i.e., coroutine_2
, coroutine_3
, and coroutine_4
). If return_exceptions
is True, all the tasks (or coroutines) complete the execution, even if one or more tasks raise an unhandled exception.
Exceptions raised, if any, are provided as a return value in the results
list returned from gather()
. Like the earlier scenario, coroutine_1
raises an exception. Since this time around return_exceptions
is set to True, the assertion raised by coroutine_1
will be added to the return list, and all the other awaitables (i.e., coroutine_2
, coroutine_3
, and coroutine_4
) will complete their execution.
In the example below, we have four coroutines that are passed as a list to the asyncio.gather()
method. The coroutine_1()
throws the ValueError
exception, whereas the coroutine_2()
throws the SystemError
exception.
import asyncio
import sys
async def coroutine_1():
await asyncio.sleep(2)
raise ValueError(sys._getframe().f_code.co_name + " failed with ValueError")
return("coroutine_1 finished.")
async def coroutine_2():
await asyncio.sleep(2)
return("coroutine_2 finished.")
async def coroutine_3():
await asyncio.sleep(2)
raise SystemError(sys._getframe().f_code.co_name + " failed with SystemError")
return("coroutine_3 finished.")
async def coroutine_4():
await asyncio.sleep(2)
return("coroutine_4 finished.")
async def main(return_exceptions_val):
try:
results = await asyncio.gather(
coroutine_1(), coroutine_2(), coroutine_3(), coroutine_4(),
return_exceptions = return_exceptions_val
)
print(results)
except ValueError as e:
print("Value Error raised.")
print("Running with return_exceptions = False")
asyncio.run(main(return_exceptions_val = False))
print("\nRunning with return_exceptions = True")
asyncio.run(main(return_exceptions_val = True))
When return_exceptions
in the gather()
method is set to False, coroutine_1
raises ValueError
, and all other tasks on the list: coroutine_2
, coroutine_3
, and coroutine_4
are canceled as the execution value is propagated to the tasks. The execution story is entirely different when return_exceptions
is set to True. Here, all four coroutines complete the execution even though ValueError
and SystemError
are raised by coroutine_1
and coroutine_3
, respectively.
With return_exceptions
set to True, exceptions raised are provided as a return value in the list returned from the gather()
method of the Python asyncio
library.
Async/Await in Asyncio
We have used the async
/await
combination extensively throughout this Python asyncio tutorial. To put it in simple terms, async
/await
are the guiding pillars for realizing concurrent code execution using the Python asyncio
library. The async
keyword converts a Python function into a coroutine, whereby the coroutine can be executed asynchronously. The async keyword returns a coroutine object that is run by the event loop. What this essentially means is that the coroutine can momentarily pause its execution under the following circumstances:
- Waiting for I/O operations - making network requests, interacting with databases, and more.
- Waiting for external events - specific test conditions before proceeding with actions, monitoring and logging server-side issues, and more.
- Achieving better concurrency - yielding control to the event loop when there are waits (or sleep), running multiple coroutines using
asyncio.gather()
.
Coroutines can pause their execution using the await
keyword. The await
keyword suspends the currently executing coroutine, and the control is yielded to the event loop. The suspend coroutine/task is again scheduled for execution when the awaitables (i.e. I/O, timer expiry, etc.) completes. With the current task suspended, the event loop now schedules and executes coroutines that are ready for execution. Once the awaited task is completed, the earlier suspended coroutine resumes execution from the point where it was paused. Now that we have covered the major aspects of the Python asyncio
library let’s look at an example that showcases the usage of async
/await
keywords in Python:
# Demonstration of asyncio with Python (Pytest is the automation framework)
# Includes the following:
# Usage of aiohttp
# Usage of asyncio.gather
# Marking tests as async using the @pytest.mark.asyncio marker
import pytest
import aiohttp
import asyncio
import json
import ssl
import os
import sys
from dotenv import load_dotenv
import certifi
load_dotenv()
user_name = os.getenv('LT_USERNAME')
api_key = os.getenv('LT_ACCESS_KEY')
# Inspiration - https://stackoverflow.com/questions/53199248/get-json-using-python-and-asyncio
async def get_top_reddit_threads(subreddit, session):
url = f"https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=20"
# Reference JSON - https://www.reddit.com/r/Playwright/top.json?sort=top&t=day&limit=20
data = await get_json(session, url)
if data:
data_decoded = json.loads(data.decode('utf-8'))
print(f'\nReddit details for {subreddit}')
print(f'____________________________\n')
for post in data_decoded['data']['children']:
score = post['data']['score']
title = post['data']['title']
link = post['data']['url']
if score and title and link:
print(f'Score: {score} | Title: {title} | Link: ({link})')
# Fetch JSON data from a URL
async def get_json(session, url):
headers = {"accept": "application/json"}
try:
async with session.get(url, headers=headers) as response:
# Response 200 - We have the data!
assert response.status == 200
return await response.read()
except aiohttp.client_exceptions.ClientConnectorCertificateError as e:
print(f"SSL Certificate Error: {e}")
return None
except Exception as e:
print(f"Error fetching data: {e}")
return None
# Refer LambdaTest API documentation - https://www.lambdatest.com/support/api-doc/
async def get_lambdatest_sessions(session):
url = f"https://{user_name}:{api_key}@api.lambdatest.com/automation/api/v1/sessions?limit=40"
data = await get_json(session, url)
if data:
data_decoded = json.loads(data.decode('utf-8'))
for test in data_decoded['data']:
test_id = test['test_id']
build_name = test['build_name']
status_ind = test['status_ind']
print(f"Build: {build_name} | ID: {test_id} | Status: {status_ind}")
@pytest.mark.asyncio
async def test_fetch_lambdatest_sessions():
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
await get_lambdatest_sessions(session)
@pytest.mark.asyncio
async def test_fetch_reddit_threads():
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
subreddits = ['Selenium', 'Playwright', 'Python', 'asyncio']
tasks = [get_top_reddit_threads(subreddit, session) for subreddit in subreddits]
# Gather the tasks using gather() method of asyncio
await asyncio.gather(*tasks)
In the above example, we have two async functions:
Test - Async Function (Coroutine) | Purpose |
---|---|
test_fetch_lambdatest_sessions() | Fetches details of the sessions of the tests executed on LambdaTest. |
test_fetch_reddit_threads() | Fetches top Reddit threads matching certain topics using the API provided by Reddit. |
Let’s deep dive into the integral aspects of the code: To get started, we import all the essential libraries and modules: pytest
, asyncio
, aiohttp
, and others that are used for asynchronous programming, test execution, and more. Since the LambdaTest and Reddit APIs provide output in the JSON format, hence the json
module is also imported into the code. Since LambdaTest APIs are used in the tests, the user name and access key obtained from the LambdaTest Profile > Password and Security section are exposed as LT_USERNAME
and LT_ACCESS_KEY
environment variables, respectively.
There are the async functions (or coroutines) in the example, of which two are helper functions used by the respective tests:
1. Coroutine 1 (Helper): get_json()
The above coroutine fetches the data in a JSON format using the respective URL. First, we create a dictionary containing the accept header, which specifies that the client expects a response in the JSON format.
Next up, an asynchronous GET request to the specified URL (i.e., LambdaTest URL or Reddit API), including the headers. The status
variable of the response
object indicates whether the request was successful or not. You can also get HTTP status code via Apache Client. Any response other than 200 means that the client's request to the server was unsuccessful. The read
method of the response
object returns the content of the response. The try…except
loop is used to catch or handle exceptions.
2. Coroutine 2 (Helper): get_lambdatest_sessions()
This particular coroutine returns the meta-data related to test sessions for the tests executed on the LambdaTest platform. We are using the LambdaTest REST API to fetch session details for the last 40 tests executed on LambdaTest.
In order to use LambdaTest, you need to create an account and export the environment variables LT_USERNAME
and LT_ACCESS_KEY
mentioned earlier. Since we need session information for only the last 40 sessions, the limit
parameter in the LambdaTest sessions API is set to 40. LambdaTest user name and access key are appended to the URL for authentication purposes.
Next, the HTTP session object and constructed URL are supplied as input parameters to the get_session()
coroutine that we discussed earlier. The get_session()
coroutine returns the LambdaTest session meta-data in the JSON format. Once we have the byte-encoded data
, it is then decoded into a string of UTF-8 format. The loads()
method of the json
module parses the JSON string into a Python dictionary. Once we have the data
in the JSON-decoded dictionary, we loop through each item in the data
key.
As seen from the API response, each test session comprises a unique test_id
, a build_name
, and a status_ind
indicator. All these respective entries for each test session are printed on the console.
3. Coroutine 3 (Helper): get_top_reddit_threads()
This helper function provides the top Reddit topics for a particular subreddit (e.g., 'Selenium', 'Playwright', 'Python', 'asyncio'). The subreddit is passed as an argument to the get_top_reddit_threads()
coroutine. Like the other helper functions, we first construct the URL along with the query parameters sort=top
, t=day
, and limit=20
to get the top 20 posts of the day.
The get_json()
helper is invoked here as well in order to perform the GET request on the URL created in the earlier step. The byte-encoded response data
is then decoded into a string of UTF-8 format. The loads()
method parses the JSON string into a Python dictionary. Now that we have the decoded JSON, we loop through each item in the children
list, which is under the data
key. Each item represents a Reddit post. For testing, just head over to the Python subreddit URL (for Python) and it is seen that entries score
, title
, and link
are present in the children
list, which is under the data
key.
The meta-data (i.e., score, title, and sub-reddit link) are finally printed on the console. With the helper functions all covered, let’s look at the test functions/coroutines used in the example:
Coroutine 1 (Test Function): test_fetch_lambdatest_sessions()
Since this is an asynchronous test function (or coroutine), it is marked with the @pytest.mark.asyncio decorator
. With this, the test_fetch_lambdatest_sessions()
function is executed as an asyncio task in the event loop provided by pytest-asyncio
.
Next, a default SSL context is created using the create_default_context()
method of the SSL library. During the implementation, we came across a few errors related to SSL verification, and the Stack Overflow thread on SSL in Python helped resolve those errors.
The path to the system CA certificate bundle is provided via the certifi.where()
method. The certificate is then passed to the SSL context to ensure a secure connection.
ClientSession
is the entry point for all the client API operations. Since we are using custom SSL parameters, an ssl.SSLContext
instance is created and used for the entire session with ClientSession(connector=TCPConnector(ssl=ssl_context))
. The connector parameter in ClientSession
is set to aiohttp.TCPConnector
with the ssl
parameter set to ssl_context
. With this, the session uses the SSL context (ssl_context
) for secure connections.
Finally, the helper function/coroutine get_lambdatest_sessions()
is invoked with the newly created session
passed as a parameter to it. As seen from the example, we are creating a new session object for each test, this could have been further optimized by using pytest
fixtures with asyncio.
Coroutine 2 (Test Function): test_fetch_reddit_threads()
Most of the implementation of test_fetch_reddit_threads()
remains the same as that of test_fetch_lambdatest_sessions()
, barring a few changes. First, we create a list of all the subreddits (i.e., Selenium, Playwright, Python, and asyncio) whose top threads need to be fetched. Next up, we create a list of tasks/coroutine objects by invoking the helper get_top_reddit_threads()
for each subreddit in the list. The session
object that was created in the earlier step is passed in the helper function along with the subreddit. All four tasks run concurrently via the gather()
method of the Python asyncio
library. The return_exceptions
parameter in asyncio.gather()
is set to False, which means that an exception raised by any task will result in cancellation of the other tasks/awaitables.
Execution
Invoke the following command on the terminal to execute the tests: pytest --verbose --capture=no tests/sample-examples/5_async_await.py
As seen in the execution snapshot, both the tests executed successfully, and the top 20 LambdaTest test sessions and the top 40 Reddit threads are printed on the console.
To summarize, the combination of async/await, tasks, and more can be leveraged to handle concurrent execution and free up the CPU when I/O bound operations, network requests, etc. are in progress. All of this allows the application to remain responsive while accelerating the speed of test execution.
How to Make Python Code Asynchronous?
Before getting into how to make the code asynchronous, it is important to identify the scenarios for which async
should be preferred over the sync
mode. Here are some considerations that should be taken into account when opting for asynchronous code in Python:
Identify Operations Involving External Resources
Until now, it is very evident that asynchronous execution is the way to go when the application involves operations with external resources: network requests, database queries, I/O, etc. In such scenarios, the CPU would be less loaded, which would allow it to pick up other tasks that require its attention. Here, we are not referring to CPU-bound tasks for which Python asyncio has to be integrated with executors to improve application responsiveness and performance. For CPU-bound tasks or blocking I/O, Python asyncio can be used with ThreadPoolExecutor
for offloading tasks from the asyncio event loop. Also, Python asyncio with ProcessPoolExecutor
offers the benefits of parallelism by making the best use of multi-CPU cores.
Concurrent Test Execution
Secondly, asynchronous execution should be opted for if the application has independent tasks that can be executed concurrently. We looked into one such scenario where we made multiple API calls (i.e., LambdaTest APIs and Reddit APIs) in parallel, and async
/await
and tasks helped in faster test execution. Libraries like Python asyncio
, aiohttp
, aiomysql
, aiopg
, etc., can be leveraged to concurrently execute applications built using the event-driven microservices architecture.
You can also refer to our blog on Microservices Design Principles for a quick refresher on design patterns that best suit your microservices-based application.
Mark Functions as Coroutines
Functions with I/O-bound operations are the ideal contenders for asynchronous execution. These functions can be converted into coroutines using the async
/await
keywords.
Replacing blocking calls with await
allows other tasks to be executed while the awaited operation is in progress. As seen above, two simple tests that sleep for 2 seconds execute concurrently using the gather()
method of the Python asyncio
library. While the sync
equivalent of the above snippet takes 6 seconds to execute, the async
variant executes in 4 seconds.
To summarize, it is recommended to accelerate execution time with asyncio by replacing tasks/functions that involve operations pertaining to I/O, servers, databases, and more.
Demonstration: Asyncio in Python
Now that we have covered most of the essential concepts of Python asyncio
, let’s dive deep into real-world scenarios. Before doing the same, let’s set up the project and execution environment.
Project Structure
The project structure is where the tests demonstrating the usage of the Python asyncio
library are located in the tests folder. Let’s do a deep dive into the project structure:
pageobject
- Contains locators used in the respective tests. Primarily created for realizing the needs of Page Object Model in Selenium Python.tests/fetching_pokemon_names
- Sync and async fetching of Pokemon information using Pokemon APIs.tests/fetching_weather_information
- Sync and async fetching of current weather of US cities using OpenWeather APIs.tests/get_automation_builds
- Fetching metadata of sessions created for running tests on the LambdaTest Cloud Grid. LambdaTest Sessions API is used to fetch the respective information.tests/url_health_checking
- Sync and async implementation for checking the health of links present on LambdaTest Selenium Playground.tests/web_scraping
- Scraping of items on LambdaTest eCommerce Playground using sync and async programming in Python.
Apart from the above-mentioned directories, the project also contains the following files in the project’s root directory:
conftest.py
- Configuration file inpytest
used for sharing fixtures, hooks, and other configuration settings across test files in the suite.Makefile
- Contains commands used for executing tests in sync and async mode in Python.pyunitsetup.py
- Implementation for setting up the browser instance,setUp
, andtearDown
functions.requirements.txt
- Contains a list of packages or libraries (e.g.,bs4
,aiohttp
, etc. required for the implementation and execution.
As stated earlier, it is recommended to have Python 3.4 (or later) since the Python asyncio library is available out of the box in those versions of Python.
Project Prerequisites
It is recommended to use a virtual environment (venv
) since it helps in better management of dependencies and environments. In a nutshell, the virtual environment isolates the packages from the base environment.
Note: Please replace pip3 with pip depending on the pip version installed on your machine.
Run the commands virtualenv venv
and source venv/bin/activate
on the terminal to create the virtual environment. Now that the virtual environment is ready let’s install the required libraries present in requirements.txt. Invoke the command pip3 install -r requirements.txt
on the terminal. Here is the list of libraries that we have installed for the demonstration:
Library | Description | GitHub Link |
---|---|---|
pytest-xdist | Helps realize parallel test execution with pytest | https://pypi.org/project/pytest-xdist/ |
pytest-asyncio | Provides support for coroutines as test functions. | https://pypi.org/project/pytest-asyncio/ |
requests | HTTP library used for making HTTP requests in a synchronous manner | https://pypi.org/project/requests/ |
pytest-order | Allows customization of the order in which the tests in Pytest are executed | https://pypi.org/project/pytest-order/ |
bs4 | Allows scraping of information from HTML and XML documents | https://pypi.org/project/beautifulsoup4/ |
aiohttp | Asynchronous HTTP client/server framework. Used for accelerated web scraping in Python | https://pypi.org/project/aiohttp/ |
python-dotenv | Reads key-value pairs from a .env file | https://pypi.org/project/python-dotenv/ |
In the interest of time, we will be deep-diving into the following scenarios in the further sections of this Python asyncio tutorial:
- Web scraping
- URL health checking
- Fetching weather information
- Getting session details on LambdaTest
We will be benchmarking async
with sync
using the Hyperfine command-line tool. In case you are using macOS, run the common brew install hyperfine
on the terminal for installing Hyperfine in the execution environment. At the time of writing this Python asyncio tutorial, the latest version of Hyperfine is 1.18.0. Run the command hyperfine --help
in case you need more information about the usage of Hyperfine.
Asynchronous Web Scraping in Python
Web scraping in Python is one of the popular use cases where libraries like requests
, BeautifulSoup
(bs4
), etc., can be leveraged to scrape information from a document.
In this Python web scraping repo, I have used the synchronous approach to scraping. Though synchronous web scraping with requests
and bs4
does the job well, it might falter in performance (or scraping time) if information has to be scraped from a large number of pages.
We will be scraping content from LambdaTest eCommerce Playground like it is done in the repo where we have used bs4
and requests
for sync web scraping. Since we will be benchmarking the sync vs. async performance, we first scrap content on the eCommerce Playground using the sync approach (using bs4
and requests
libraries in Python).
Implementation (Synchronous Web Scraping in Python)
For simplification, we will be porting the sync code to its async equivalent by doing the following modifications:
Porting Sync Implementation to Asyncio |
|
Though BeautifulSoup/bs4
is not recommended due to its synchronous nature, we are using it to simply parse the HTML content from the eCommerce Playground.
Implementation (Asynchronous Web Scraping in Python)
import asyncio
import aiohttp
import sys
import ssl
import certifi
import time
from pprint import pprint
from bs4 import BeautifulSoup
sys.path.append(sys.path[0] + "/../../")
from pageobject.locators import locators
from pageobject.helpers import helpers
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
# Encountered the below error
# aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to
# host ecommerce-playground.lambdatest.io:443 ssl:True
# [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed:
# unable to get local issuer certificate (_ssl.c:1000)')]
# Solution: https://stackoverflow.com/a/66842057/126105
async def scrap_ecommerce(url):
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
html = await fetch(url, session)
soup = BeautifulSoup(html, 'html.parser')
rows = soup.select('.product-layout.product-grid.no-desc.col-xl-4.col-lg-4.col-md-4.col-sm-6.col-6')
meta_data_arr = []
for row in rows:
link = row.find("a", class_='carousel d-block slide')
name = row.find("h4", class_='title')
price = row.find("span", class_='price-new')
meta_data_dict = {
'product link': link.get('href'),
'product name': name.get_text(),
'product price': price.get_text()
}
meta_data_arr.append(meta_data_dict)
return meta_data_arr
async def main():
start_time = time.time()
base_url = locators.test_bs4_url
tasks = [scrap_ecommerce(f"{base_url}&page={i}") for i in range(1, 6)]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results, 1):
print(f"Product Page = {base_url}&page={i}")
print("*********************************************************************************************************")
helpers.print_scrapped_content(result)
print()
print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")
if __name__ == '__main__':
output = asyncio.run(main())
To get started, we first import all the required libraries in the project. We came across aiohttp.client_exceptions.ClientConnectorCertificateError
is the solution that we discovered in this Stack Overflow thread. Hence, the certifi
library that validates the trustworthiness of SSL certificates is imported into the code. The aiohttp
and bs4
libraries are also imported for asynchronous HTTP communication with Python asyncio and parsing HTML content, respectively.
Like the earlier example of sync web scraping, the locators
are separated from the core implementation to make the ideal use of the Page Object Model in Python. On similar lines, helpers.py contains the helper functions that would be used in the tests. All the functions in the async-based implementation are marked with the async def keyword.
The scrap_ecommerce()
coroutine does the scraping of the content from the LambdaTest E-Commerce Playground. The URL to be scraped is passed as a parameter to the coroutine. As mentioned earlier, we encountered SSL certificate verification errors. To counter the same, we have created a custom SSL context that includes a trusted certificate authority (CA) bundle. It is used to verify SSL certificates.
The create_default_context()
method of the SSL library creates a new SSL context with the default settings. certifi.where()
returns the path to the CA bundle, a file that contains root and intermediate certificates. The custom SSL context (i.e., ssl_context
) will be used in further sections of the code.
The ssl_context
created in the earlier step is now passed to the TCPConnector()
method of the aiohttp
library for creating a ClientSession
(named session
). HTTP requests made within the session
are always secure with the trusted CA bundle.
The fetch_url()
coroutine is a helper that takes the URL to be scraped and the currently active session as the input parameters. It asynchronously fetches data (with async HTTP GET request) from the specified URL using the aiohttp
library. With await response.text()
, completion of reading the response content is awaited, and the same is converted into HTML content (of string format). BeautifulSoup(html, ‘html.parser’)
returns a BeautifulSoup object that is used further for scraping.
As mentioned earlier, this is not a time-consuming operation; hence, bs4
is used in asynchronous web scraping. The select()
method of BeautifulSoup (bs4
) finds elements using the CSS Selector property: .product-layout.product-grid.no-desc.col-xl-4.col-lg-4.col-md-4.col-sm-6.col-6
.
As seen below, 15 elements match the CSS Selector, as there are 15 products on the said product page. The method returns a list that is used later for scraping meta-data (i.e., name, price, description, etc.) of every product on the page.
It is important to note that the same logic is also used for synchronous web scraping. A loop is run for scraping information of all the 15 products (or elements) under the div
located in the earlier step. The product link is obtained by locating the element using the find()
method of bs4
. The first argument is the tag that needs to be searched for (i.e., ‘a’ – anchor tag), and the second is the CSS Class attribute.
Along similar lines, the price of the product/element is obtained by locating the element using the find()
method of bs4
along with the Class selector (i.e., price_new
). The get_text()
method of bs4 provides the product name and price when used with the respective elements.
Every dictionary entry (i.e., meta_data_dict
representing product name, product link, name, and product price is finally appended to a list in Python. Since we have to scrap product information from Page 1 through Page 5, we first create tasks for scraping information from the said pages in a concurrent fashion.
The tasks created in the earlier step are run concurrently by invoking the gather()
method of the Python asyncio
library. The return_exceptions
parameter in the method is set to FALSE (by default), which means that failure in scraping content from any page would result in the cancellation of other tasks.
The final result is available in the results
list.
It is the above step that leverages the advantages offered by tasks in Python asyncio
and aiohttp
library for concurrent scraping of multiple pages (Page 1 through Page 6). Shown below is its sync equivalent, where the scrap_commerce()
method is called for every page but in a synchronous manner.
Now that the content is scraped, we invoke the helper print_scrapped_content()
for printing scraped content on the console. The asyncio.run()
is invoked to execute the main()
coroutine synchronously.
Benchmarking: Sync and Async Web Scraping
Invoke the command make perform-web-scraping
for benchmarking the scraping use case using the Hyperfine command-line utility. The warmup
option in Hyperfine is set to 3. Hence, the actual benchmarking starts after three warm-up runs. The show-output
option in Hyperfine shows the command output/execution on the terminal.
As seen from the benchmarking results, asynchronous web scraping is close to 2.93 times faster than its synchronous counterpart. Though the number looks a tad smaller, it is something that can make a huge performance impact when used in scraping a large number of documents (or pages).
FastAPI With Asyncio for High-Performance APIs
FastAPI is a popular modern and high-performance web framework that is used to build APIs with Python. The framework is designed to optimize the overall developer experience so that you can build production-ready APIs while keeping the best practices in mind. As stated in the FastAPI official documentation, FastAPI's performance is on par with that of NodeJS and Go. It is built on open standards for APIs, i.e., OpenAPI (earlier known as Swagger) and JSON Schema. Like other Python frameworks, FastAPI also increases developer productivity and minimizes duplication of code.
FastAPI, when combined with Python asyncio results in improved throughput and faster response times, as Python asyncio helps better handle simultaneous incoming requests. This makes the application more scalable, as Python asyncio effectively manages the system resources. With Python asyncio, you can build high-performance and low-latency APIs that are capable of handling high loads with ease.
To install FastAPI, you need to trigger pip3 install fastapi uvicorn
on the terminal. uvicorn
is the server that will use the API you build to serve requests.
At the time of writing this Python asyncio tutorial, the latest versions of FastAPI and uvicorn are 0.112.2 and 0.30.6 respectively.
It is recommended to separate the tests from the core application logic, very similar to what we normally do in the Page Object Model in Python. For FastAPI, the core application logic (i.e., routines, dependencies, configurations, etc.) is normally placed in the app
folder, whereas the test logic is placed in the tests
folder. However, you can have a different directory structure depending on the project requirements. In a nutshell, opt for a directory structure that makes the code more maintainable and scalable in the long run.
Demonstration: FastAPI With Asyncio
In order to demonstrate the capabilities of FastAPI with Python asyncio, we will develop a simple FastAPI application that interacts with the LambdaTest APIs. The APIs are used for fetching details associated with the builds and sessions executed from my account. The core application logic is in app/main.py
, and the tests that use the application logic are placed in tests/test_main.py
. The APIs that fetch the build and session information asynchronously are located in tests/fastAPI/app/main.py
.
Let’s look at some of the most important aspects of the code! First, we import the FastAPI class, which is used for creating the FastAPI application. An instance of that class is normally assigned to a variable (e.g., app
). The newly created app
object is used to define the web application, including endpoints, configurations, and more.
As we are fetching information from LambdaTest using LambdaTest REST APIs, the username and access key are read from the environment variables LT_USERNAME
and LT_ACCESS_KEY
. As stated earlier, an object (named app
) of the FastAPI class is created using app = FastAPI()
.
As seen in the implementation, the @app.get("/")
decorator is used to define a route/endpoint that responds to GET requests at the root URL ("/"
). In our case, it is @app.get("/builds/")
and @app.get("/sessions/")
. get_lambdatest_all_builds()
and get_lambdatest_all_sessions()
are two asynchronous methods for fetching build- and session-related information, respectively.
As seen below, we first construct the URL with a combination of user_name
, access_key
, and the LambdaTest API. The limit
parameter is set to 50 so that details about the first 50 builds are fetched with the API. The request headers are set up to specify that the client expects a JSON response from the server.
Next, an asynchronous context manager (async with
) is used to make a GET request to the LambdaTest API with the session object. It is an instance of an HTTP client session (e.g., aiohttp.ClientSession
). The status code of the response object is checked. If the request is successful, the status of the response is 200. An exception is raised, and the error message returned by the server is printed if the execution is not successful.
As stated in the FastAPI official documentation, the @app.get("/builds")
route decorator tells FastAPI that the function right below it is responsible for handling requests that go to the path /builds/
using a GET operation. First, an asynchronous HTTP client session (ClientSession
) is created using the aiohttp
library.
As seen earlier, the ClientSession
object is used to manage HTTP requests within the session. The session object is passed to the get_lambdatest_all_builds()
method. It returns data in a JSON format if the LambdaTest API returns build information. The data
field is extracted from the JSON response (i.e., builds_data
). An empty list is returned if the data
field does not exist.
The uvicorn.run(app, host="0.0.0.0", port=8000)
command runs the FastAPI application (i.e., app
) that was created earlier using the app = FastAPI()
method. The argument host="0.0.0.0"
tells Uvicorn to listen to all available IP addresses. The other argument, port=8000
, specifies the port on which the server should listen for incoming requests.
Now that we have covered the methods that are part of the business logic, let’s look at the test methods used for testing the APIs implemented earlier. The test methods are part of tests/test_main.py
.
We import the TestClient
class from the fastapi.testclient
module. TestClient
helps simulate requests to the FastAPI application and receive responses from the application. After the app
directory (i.e., tests/app
) is added to the system path, the FastAPI application instance/object (app
) is imported from the module named main
.
Next, we create a TestClient
for the FastAPI application by invoking client = TestClient(app)
. In the application code, we created two routes/endpoints: builds
and sessions
. Here in the test code, we simulate an HTTP GET request to the respective endpoints (builds
and sessions
).
If the status of the response is OK (i.e., 200), we parse the body of the HTTP response as JSON. A Python assert
is raised if the value associated with the dashboard_urls
key is not of type list
.
With this, we are all set to execute the FastAPI Python asyncio tests. The tests will run on the LambdaTest platform. It is an AI-powered test execution platform that allows developers and testers to run Python automated tests at scale across various operating systems and web browsers online.
After exporting the environment variables LT_USERNAME
and LT_ACCESS_KEY
, run the command make fast-api-asyncio
on the terminal to execute the tests implemented in tests/test_main.py
. As seen below, the details of the builds and sessions associated with my LambdaTest account are printed successfully.
What we have demonstrated here is just scratching the surface as far as FastAPI with Python asyncio is concerned. Their combination can be harnessed to develop high-performing, scalable, efficient, and fast web applications.
Asynchronous URL Health Check in Python
One of the popular use cases of Python asyncio
is checking the health of a web service (or endpoint) by periodically analyzing responses received from requests sent to a specified URL. It can also be used for API testing, leveraging different HTTP methods (e.g., GET
, POST
, PUT
, DELETE
) supported by the respective API. Website monitoring and Service Level Agreements (SLAs) are other prominent use cases of URL health checking with Python.
In all cases, the first and foremost step is to check the availability of the URL and verify whether the response received has a status code of STATUS_OK
(or 200
). To demonstrate the usage of Python asyncio
, we will perform a health check of the URLs present in the LambdaTest Selenium Playground. Similar to the previous example, we will benchmark the performance of synchronous and asynchronous implementations.
Implementation (Synchronous URL Health Checking in Python)
First we scrap all the URLs which are stored in the meta_data_arr
array. Now that the URLs are available, a for
loop that iterates through every URL in meta_data_arr
. For each URL, an HTTP GET request is sent by invoking the get()
method of the requests
library. The received response is stored in a variable status_code
. As mentioned earlier, the health of the URL is fine (or is reachable) if the response to GET is 200. For instance, the link to Ajax Form Submit is reachable and should return a response of 200 when it is requested using the GET method. The same principle applies to all the other links present in the Playground. Finally, an assert is raised if the URL is not reachable (or the status code is not 200).
Implementation (Asynchronous URL Health Checking in Python)
Let’s port the existing code such that it runs asynchronously. Here are the top-level changes in the implementation:
- [Optional] Marking the tests with the
@pytest.mark.asyncio
decorator. However, this step is optional as we have already addedasyncio_mode = auto
inpytest.ini
. You can refer to this Stack Overflow thread for more information. - Replacing synchronous
requests
library with asynchronousaiohttp
library for performing multiple asynchronous HTTP requests concurrently. - Using the
gather()
method of the Pythonasyncio
library to run multiple tasks concurrently.
Shown below is the complete implementation of Asynchronous URL health checking in Python:
import sys
from pageobject.locators import locators
from pageobject.locators import *
from pageobject.helpers import helpers
from pageobject.helpers import *
sys.path.append(sys.path[0] + "/../../")
class TestAsyncHealthCheckOps:
@pytest.mark.asyncio
@pytest.mark.run(order=1)
async def test_async_url_access(self, driver) -> list:
start_time = time.time()
meta_data_arr = []
driver.get(locators.test_playground_url)
driver.maximize_window()
meta_data_arr = helpers.scrap_playground_url(driver)
async def check_status(session, url):
async with session.get(url) as response:
status_code = response.status
print(url + " status = " + str(status_code) + " ")
return status_code
async with aiohttp.ClientSession() as session:
tasks = [check_status(session, url) for url in meta_data_arr]
status_codes = await asyncio.gather(*tasks)
for status_code, url in zip(status_codes, meta_data_arr):
assert status_code == 200, f"Failed for URL: {url}, Status Code: {status_code}"
print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")
For this scenario, we have used the pytest
framework in Python. The execution is performed on a headless Chrome browser since we won’t be interacting with web elements on the page. As we are using pytest
, the fixtures and hooks are located in the conftest.py
configuration file, which houses the required settings. The environment variable EXEC_ASYNC
is set to true
for async execution.
As shown below, the to_thread()
method of the Python asyncio
library is used for creating Chrome browser instances in a separate thread. The method (i.e., driver()
) used for creating a browser instance is marked with the pytest
fixture in Python with a function scope. Hence, the fixture is invoked for the test methods where it is being used.
Once the headless Chrome instance is created, we first scrape all the URLs present on the LambdaTest Selenium Playground. The helper method scrap_playground_url()
returns an array containing the scraped content. Let’s dive into it!
We first locate the entire grid housing the links using the find_element()
method in Selenium. The element is located using the XPath Selector:
loc_parent_elem = driver.find_element("xpath", "//*[@id='__next']/div/section[2]/div/ul")
The child elements in loc_parent_elem
with the class name pt-10
are located using the find_elements()
method:
loc_list_elems = loc_parent_elem.find_elements("class name", "pt-10")
Like the earlier example, we iterate over each WebElement
in the loc_list_elems
list. The href
attribute of the child element, located using the CSS Selector .text-black.text-size-14.hover\:text-lambda-900.leading-relaxed
, contains the link to a page. All the extracted links (final_link
) are appended to the meta_data_arr
array, which is returned by the helper function.
Now that we have scraped the URLs present on the page, the next step is to check whether the URL is reachable. Before that, we create an asynchronous session (session
) using the ClientSession
class from the aiohttp
library. The session is used to manage the HTTP requests and responses.
The check_status()
coroutine takes the session (created earlier) and the target URL as input parameters. The method asynchronously fetches data from the specified URL using the aiohttp
library. As shown above, the get()
method sends an HTTP request to the URL provided as a parameter. The response
object, an instance of aiohttp.ClientResponse
, contains the client response. The status
attribute provides the HTTP status of the response.
Next, we create a list of tasks to run asynchronously. The list iterates over each URL in meta_data_arr
and creates a coroutine for the URL by invoking the check_status()
coroutine. The gather()
method of Python asyncio
runs the tasks asynchronously. Tasks are unpacked and passed as separate arguments to gather()
.
Upon execution, we have a list of status_codes
(or response codes) for each URL.
Finally, we run a for
loop that iterates over two lists: the status_codes
list and the meta_data_arr
list. It asserts whether the status code for the corresponding URL in meta_data_arr
is anything other than STATUS_OK
(or 200
).
Benchmarking: Sync and Async URL Health Checking
Invoke the check-url-health
command to benchmark the specified use case using the Hyperfine command-line utility. The benchmark is conducted after 10 successful runs of both the sync and async implementations. The --show-output
option in Hyperfine displays the command output/execution on the terminal.
As seen from the benchmarking results, asynchronous URL health checking is close to 1.70 times faster than its synchronous counterpart. The impact of async URL health checking will be monumental if the page/document (under test) contains a large number of links.
Asynchronous Weather Check in Python
In this example, we will extract weather information about US cities using OpenWeather APIs. Once you create an account on OpenWeather, you need to copy the Open Weather API key from the API Keys Section. Post this, create an environment variable by invoking the following command on the terminal:
export OPEN_WEATHER_API=<API-KEY>
As mentioned in the official OpenWeather documentation, gathering weather information for a particular city is possible via the following API:
API: Current Weather Data
https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={API key}
- lat: latitude of the location
- lot: longitude of the location
- API key: Open Weather API Key
We tried out the API for a latitude and longitude combination, which provided the weather information in a JSON format.
Though we would be scraping weather data, the difference here is that there is a relatively big data set when compared to the earlier examples. For demonstration, we would first scrape the City Name, Latitude, and Longitude of US cities mentioned in LatLong.net.
Data from Page 1 through Page 13 is scraped and fed to the OpenWeather API for fetching weather data. Like the previous examples, we would be using requests
and bs4
for sync weather fetching and bs4
and tasks (in Python asyncio
library) for async fetching of weather data.
Implementation (Synchronous Weather Fetching in Python)
# Beautiful Soup Official Documentation - https://www.crummy.com/software/BeautifulSoup/bs4/doc/
# Import the locators file
import sys
sys.path.append(sys.path[0] + "/../../")
from pageobject.locators import locators
from pageobject.locators import *
from pageobject.helpers import helpers
from pageobject.helpers import *
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('OPEN_WEATHER_API')
###### Page 1: https://www.latlong.net/category/cities-236-15-1.html ######
start_page = 1
###### Page 13: https://www.latlong.net/category/cities-236-15-13.html ######
last_page = 5
weather_data_arr = []
def scrap_weather_site(url) -> list:
response = requests.get(url)
if response.status_code != 200:
print(f"Unable to fetch the page. Status code: {response.status_code}")
return None
soup = BeautifulSoup(response.text, 'html.parser')
rows = soup.find_all('tr')[1:]
for row in rows:
td_tags = row.find_all('td')
# Extract values/text from all tags
td_values = [td.get_text(strip=True) for td in td_tags]
# print(td_values)
weather_data_dict = {
'location': td_values[0],
'latitude': td_values[1],
'longitude': td_values[2]
}
weather_data_arr.append(weather_data_dict)
return weather_data_arr
def get_weather_info(latitude, longitude):
# url = f"https://api.openweathermap.org/data/2.5/weather?lat=19.076090&lon=72.877426&appid=ad16be8d5e1200e94e2af3a5f0a321b2"
url = f"https://api.openweathermap.org/data/2.5/weather?lat=" + str(latitude) + "&lon=" + str(longitude) \
+ "&appid=" + api_key
try:
response = requests.get(url)
response.raise_for_status()
weather_data = response.json()
return weather_data
except requests.exceptions.RequestException as e:
print("Error fetching weather information:", e)
return None
# Pagination - 1:13
###### Page 1: https://www.latlong.net/category/cities-236-15-1.html ######
###### Page 13: https://www.latlong.net/category/cities-236-15-13.html ######
if __name__ == '__main__':
start_time = time.time()
for iteration in range(start_page, last_page):
# test_weather_url = "https://www.latlong.net/category/cities-236-15
test_url = locators.test_weather_url + "-" + str(iteration) + ".html"
meta_data_arr = scrap_weather_site(test_url)
# print("*****************************************************\n")
# helpers.print_scrapped_content(meta_data_arr)
for value in meta_data_arr:
# Extract latitude and longitude
# Example - {'location': 'Durango, CO, USA', 'latitude': '37.270500', 'longitude': '-107.878700'}
latitude = value['latitude']
longitude = value['longitude']
weather_info = get_weather_info(latitude, longitude)
if weather_info:
temperature = weather_info["main"]["temp"]
city_name = weather_info["name"]
print(f"Temperature in " + city_name + " is: " + str(temperature))
print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")
Here, we have two primary methods:
scrap_weather_site()
- Scrap latitude, longitude, and city name from LatLong.netget_weather_info()
- Use OpenWeather current data API for fetching weather information using Latitude and Longitude obtained from the earlier step
Since Beautiful Soup (b4)
is also used in the async implementation, we will cover the aspects of code walkthrough in that section.
Implementation (Asynchronous Weather Fetching in Python)
Instead of the requests
library, we have used the aiohttp
library for asynchronous handling of HTTP requests and responses. Also, asyncio.gather()
is leveraged to handle multiple tasks asynchronously. Here is the complete implementation of Asynchronous Weather Fetching in Python:
import sys
sys.path.append(sys.path[0] + "/../../")
from pageobject.locators import locators
from pageobject.locators import *
from pageobject.helpers import helpers
from pageobject.helpers import *
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('OPEN_WEATHER_API')
start_page = 1
last_page = 5
weather_data_arr = []
async def fetch_data(url, session):
async with session.get(url) as response:
if response.status != 200:
print(f"Unable to fetch the page. Status code: {response.status}")
return None
else:
return await response.text()
async def scrap_weather_site(url, session):
html_content = await fetch_data(url, session)
if html_content is not None:
soup = BeautifulSoup(html_content, 'html.parser')
rows = soup.find_all('tr')[1:]
for row in rows:
td_tags = row.find_all('td')
td_values = [td.get_text(strip=True) for td in td_tags]
weather_data_dict = {
'location': td_values[0],
'latitude': td_values[1],
'longitude': td_values[2]
}
weather_data_arr.append(weather_data_dict)
async def get_weather_info(session, latitude, longitude):
url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={api_key}"
async with session.get(url) as response:
if response.status != 200:
print(f"Error fetching weather information for latitude {latitude} and longitude {longitude}.")
return None
else:
weather_info = await response.json()
return weather_info
async def main():
# Encountered the below error
# aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to
# host ecommerce-playground.lambdatest.io:443 ssl:True
# [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed:
# unable to get local issuer certificate (_ssl.c:1000)')]
# Solution: https://stackoverflow.com/a/66842057/126105
# async with aiohttp.ClientSession() as session:
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
tasks = []
for iteration in range(start_page, last_page):
test_url = locators.test_weather_url + "-" + str(iteration) + ".html"
tasks.append(scrap_weather_site(test_url, session))
await asyncio.gather(*tasks)
tasks = []
for value in weather_data_arr:
latitude = value['latitude']
longitude = value['longitude']
tasks.append(get_weather_info(session, latitude, longitude))
weather_infos = await asyncio.gather(*tasks)
for weather_info in weather_infos:
if weather_info:
temperature = weather_info["main"]["temp"]
city_name = weather_info["name"]
print(f"Temperature in {city_name} is: {temperature}")
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")
Like before, the create_default_context()
method of the SSL library is used to create a new custom SSL context. It is then passed to the TCPConnector()
method of the aiohttp
library for creating a ClientSession
(named session
). This session will be used throughout this example. In the main()
function, a for loop is run from 1 through 5 since we would be scraping latitude and longitude information for the first 5 pages on the Latlong website. The page format is shown below:
- Page 1 - https://www.latlong.net/category/cities-236-15-1.html
- Page 5 - https://www.latlong.net/category/cities-236-15-5.html
Since we have to scrape content from multiple pages, the scraping is performed asynchronously. The scrap_weather_site()
method returns a coroutine object that scrapes content from test_url
using the session created in the first step. The coroutine object returned by the method is appended to the tasks
list.
Let’s take a quick look at the scrap_weather_site()
coroutine. It takes the URL (under test) and the current session as input parameters. First, the fetch_data()
function is invoked for fetching the HTML content (response.text()
) of the URL provided to the method. Now that we have the HTML document, we parse it using the html.parser
parser of BeautifulSoup (bs4
). The first row in the table contains the field titles, so they can be skipped during parsing. Therefore, [1:]
is added when searching for <tr>
elements in the parsed HTML. With this, parsing of rows starts from row number 2.
The find_all()
method of bs4
is used for searching all the rows matching the request. A for
loop is run, iterating through each row. The <td>
element in each row contains the metadata: place name, latitude, and longitude. With td_values = [td.get_text(strip=True) for td in td_tags]
, the leading (and trailing) spaces are removed from each <td>
element. Now that we have the data from every cell, a dictionary (named weather_data_dict
) is created with the data. After that, it is appended to the weather_data_arr
array.
Now that we have the tasks
list, asyncio.gather()
is invoked to run all the tasks asynchronously. At this point, we have successfully scraped latitude and longitude from Page 1 through Page 13. Next, a for
loop is run over the weather_data_arr
, and get_weather_info()
is invoked asynchronously. Let’s dive deep into that particular method.
The get_weather_info()
method takes the current session, latitude, and longitude as the input parameters. The OpenWeather Current Data API is supplied with the latitude, longitude, and OpenWeather API Key. The response (in a JSON format) to the API is obtained asynchronously by making an asynchronous GET
request to the OpenWeather API. The get_weather_info()
function, which returns a coroutine object, is executed asynchronously by invoking the tasks.append()
method that adds a new task to the list.
Finally, the tasks (or multiple coroutines) are executed asynchronously via the gather()
method of the Python asyncio
library. The execution results of the coroutines are aggregated into a single list. Like before, return_exceptions
is set to False
(the default), which means that the execution is halted if any of the coroutines fail. As seen below, the temperature
(i.e., temp
) key is inside the main dictionary. On similar lines, weather_info["main"]["temp"]
, the main field with the nested temp
field, provides the current temperature of the respective city in the US. The name
field in weather_info
provides the city name. Finally, the city name and temperature are printed on the terminal.
Benchmarking: Sync and Async Weather Check
Invoke the command make fetch-sync-weather-info
for fetching weather information of the supplied latitude(s) and longitude(s) synchronously. The operation was completed in approximately 325 seconds.
Invoke the command make fetch-async-weather-info
for fetching weather information of the supplied latitude and longitude asynchronously using aiohttp
and asyncio
. The operation was completed in approximately 7 seconds.
We tried benchmarking with a few more execution cycles and async weather checking was significantly faster than its sync counterpart.
We have more examples, such as fetching Pokémon names and getting LambdaTest automation session details, available in the GitHub repository. A plugin named pytest-asyncio-cooperative
can also be leveraged for cooperative multitasking to run your I/O-bound test suite efficiently and quickly. As stated in the official documentation, the pytest-asyncio
plugin is not compatible with the pytest-asyncio-cooperative
plugin! Additionally, FastAPI can also be used to improve the efficiency of the Python asyncio
library. This combination helps reduce latency when handling requests, resulting in faster response times. Covering FastAPI is beyond the scope of this Python asyncio
tutorial, but a separate blog would certainly do justice to the combination of Python asyncio
and FastAPI.
It’s a Wrap
Thanks for making it this far; it was definitely a long journey! As covered extensively in the tutorial, Python asyncio
offers significant benefits when it comes to concurrent and accelerated test execution. This plays a major role in boosting the application’s performance. If you want to leverage the Python asyncio
library for CPU-bound tasks or blocking I/O, we recommend checking out ThreadPoolExecutor
. It offloads tasks from the Python asyncio
event loop. Prominent use cases, like web scraping and operations involving databases, can benefit from asynchronous programming using the Python asyncio
library.
Published at DZone with permission of Himanshu Sheth. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments