DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Mastering Async Context Manager Mocking in Python Tests
  • Supercharging Pytest: Integration With External Tools
  • Exploring the Purpose of Pytest Fixtures: A Practical Guide
  • Automating Python Multi-Version Testing With Tox, Nox and CI/CD

Trending

  • Docker Base Images Demystified: A Practical Guide
  • Secrets Sprawl and AI: Why Your Non-Human Identities Need Attention Before You Deploy That LLM
  • Scaling DevOps With NGINX Caching: Reducing Latency and Backend Load
  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  1. DZone
  2. Coding
  3. Languages
  4. Python Asyncio Tutorial: A Complete Guide

Python Asyncio Tutorial: A Complete Guide

Learn to optimize test execution time and manage coroutines for efficient concurrency with Python asyncio, ideal for developers looking to streamline workflows.

By 
Himanshu Sheth user avatar
Himanshu Sheth
DZone Core CORE ·
Nov. 20, 24 · Tutorial
Likes (10)
Comment
Save
Tweet
Share
2.4K Views

Join the DZone community and get the full member experience.

Join For Free

Test execution time plays a key role in speeding up releases, especially when testing at scale. It largely depends on how well the test suites are designed, their ability to run tests concurrently, and the efficiency of the test infrastructure used.

When fetching build details for the last 500 tests, large API responses can delay execution. To avoid blocking other tasks, running API calls asynchronously is ideal, improving overall test efficiency. In this Python asyncio tutorial, we will dive deep into the nuances of asynchronous programming with Python using the asyncio (asynchronous I/O) library that was introduced in Python 3.4. The learnings of this Python asyncio tutorial will help you make the most of coroutines, tasks, and event loops for realizing concurrent execution.

Note: Async IO, AsyncIO, and asyncio are used interchangeably throughout this Python asyncio tutorial.

What Is Asynchronous Programming in Python?

As the name indicates, asynchronous programming is an approach where different tasks can be executed concurrently. This essentially means that the main (or single) thread need not be blocked when other tasks are performing I/O operations, making HTTP requests, and more. 

Tasks running concurrently in Python asyncio event loop 

As seen in the image representation above, tasks waiting for I/O operations or network requests do not block the other tasks, thereby minimizing the idle time and reducing the overall execution time. The Python asyncio library allows concurrency by using coroutines that run in an event loop, which is itself executed in a single thread. As stated in the official documentation of asyncio, the implementation of this library, which was previously called Tulip, has now been a part of the Python standard library since Python 3.4. If you are running a Python version earlier than 3.4 (which is not recommended), you can install the Python asyncio library by triggering the pip install asyncio command on the terminal.

Key methods of the asyncio library

Key methods of the asyncio library


Here are some of the use cases where asynchronous execution in Python asyncio can be highly beneficial:

  • Web applications (e.g., streaming, e-commerce, etc.) that need to handle a large number of simultaneous requests
  • Every web application that uses REST APIs that involve I/O operations (e.g., handling HTTP requests and responses)
  • Web applications using the Microservices architecture, where asynchronous execution can help accelerate handling network calls, interacting with databases, and more.

In further sections of the Python asyncio tutorial, we will be deep-diving into the core concepts of the Python asyncio library, i.e., coroutines, event loops, tasks, and async/await syntax.

Essentials of Python Asyncio

The async and await keywords form the fundamentals of asynchronous programming in Python via the Python asyncio library. With Python asyncio, a normal function (i.e., def function_name) becomes an asynchronous (or a coroutine) function using the async keyword (i.e., async def function_name). The async def change lets the current function temporarily pause its execution while the execution of respective (e.g., I/O, network requests, etc.) operations is in progress. 

Essentials of Python Asyncio  

The control is yield to the event loop when a coroutine (or task) encounters the await keyword, or a coroutine awaits another coroutine or future. Let’s look at each of the components of the Python asyncio library in more detail.

Coroutines

In the context of the Python asyncio library, coroutines can be defined as functions that provide the flexibility to temporarily pause the execution of waiting tasks. This lets the other tasks execute concurrently while waiting for the completion of blocking time-consuming operations like I/O, network requests, file operations, database operations, etc. In simple terms, the CPU is less utilized (or might be free) when I/O (or similar operations) are in progress. 

For instance, copying data to an external hard drive is an I/O operation where the CPU only initiates and accepts the I/O requests. The CPU can be better utilized in such cases to perform other tasks. The same rationale also applies to coroutines in the Python asyncio library. A normal function in Python becomes a coroutine when it is defined with the async def syntax. Upon the usage of async def, the said function yields a coroutine object. When the await keyword is encountered, the current coroutine is paused, and the control is yielded back to the event loop. The event loop continuously monitors the awaitable (e.g., coroutine, Task, a Future) until completion. 

Once the execution of the awaitable or the newly picked-up task is complete, the event loop restores the execution of the paused coroutine. It is important to note that coroutines do not make the code multi-threaded; rather, coroutines run in an event loop that executes in a single thread. Shown below is an example showcasing the usage of coroutine in Python:

Python
 
import asyncio
import sys
import time
from datetime import datetime


async def test_1():
   # Get function name
   # https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   # Could be an I/O operation, network request, database operation, and more
   await asyncio.sleep(2)
   ret_info = await test_2()
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "test_1"


async def test_2():
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   await asyncio.sleep(2)
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "test_2"


async def main():
   print("Enter main")
   start_time = time.perf_counter()
   # Execution is paused since the await keyword is encountered.
   # Control is yield back to the event loop and other coroutine (if any) is executed
   # Control is handed back to test_1 once the sleep of 2 seconds is completed
   ret_info = await test_1()
   print(f"Data received from the test_1: {ret_info}" + " " + str(datetime.now().time()))
   ret_info = await test_2()
   print(f"Data received from the test_2: {ret_info}" + " " + str(datetime.now().time()))
   end_time = time.perf_counter()
   print("Exit main")
   print(f'It took {round(end_time - start_time,0)} second(s) to complete.')


if __name__ == '__main__':
   # Run the main coroutine
   asyncio.run(main())


The test_1() and test_2() are defined as asynchronous functions (or coroutines). During the execution of test_1(), an await keyword is encountered with an async sleep of 2 seconds. This pauses the coroutine and yields control back to the event loop. With this, the execution of test_1() is paused until the completion of the test_2() coroutine. Post the execution of test_2(), the execution of test_1() coroutine is resumed, and the return value of test_1 is printed on the terminal. 

Asynchronous execution of test_1 and test_2 coroutines in Python using asyncio 

Lastly, the asyncio.run() is used to run the main coroutine until its completion. asyncio.run() also sets up the event loop, executes the coroutine, and closes the event loop when the main finishes.

Event Loop

Event loop in the Python asyncio library primarily manages the scheduling of asynchronous tasks, callbacks, I/O operations, and more. As stated earlier, the event loop manages and schedules asynchronous operations without blocking the main thread. Since the event loop continuously runs, it monitors the awaitable (e.g., coroutine, task, a future) until its execution is complete. As soon as the await keyword is encountered, the current coroutine is temporarily paused, and the control is yielded to the event loop. 

Python asyncio event loop managing asynchronous tasks 

Once the execution of the awaited task (or awaitable) is complete, the event loop resumes the execution of the paused routine. In a nutshell, the event loop in the Python asyncio library plays a pivotal role in catalyzing the asynchronous (or concurrent) execution of tasks. Shown below is an example showcasing an event loop in Python:

Python
 
import asyncio
import sys
import time
from datetime import datetime


async def test_1():
   # Get function name
   # https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   # Could be an I/O operation, network request, database operation, and more
   await asyncio.sleep(2)
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "test_1"


async def test_2():
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   # Sleep of 2 seconds
   await asyncio.sleep(2)
   print("Exit " + sys._getframe().f_code.co_name)
   return "test_2"


async def main():
   print("Enter main")
   start_time = time.perf_counter()
   # Await test_1
   ret_info = await test_1()
   print(f"Data received from the test_1: {ret_info}" + " " + str(datetime.now().time()))
   # Await test_2
   ret_info = await test_2()
   print(f"Data received from the test_2: {ret_info}" + " " + str(datetime.now().time()))
   print("Exit main")
   end_time = time.perf_counter()
   print(f'It took {round(end_time - start_time,0)} second(s) to complete.')


if __name__ == '__main__':
   # Explicitly create a new event loop
   loop = asyncio.new_event_loop()
   asyncio.set_event_loop(loop)
   loop.run_until_complete(main())
   loop.close()


A new event loop object is created using the new_event_loop() method of the Python asyncio library. As stated in this Stack Overflow Question, a new event loop creation is required if the event loop needs to run out of the main thread or a custom policy needs to be used in a single application. This set_event_loop(loop) sets the newly created loop (named loop) as the current event loop. This ensures that the get_event_loop() method returns this loop. 

In case you encounter the DeprecationWarning: There is no current event loop warning, we would suggest going through this Stack Overflow thread for more information. Shown below is the execution of the code, which indicates that the execution of the test_1() coroutine was paused when the await keyword was encountered in the code. Post the async sleep of 2 seconds, the second coroutine test_2() is executed until its completion. The total execution time is 4 seconds. 

Code execution of two coroutines with await in Python asyncio loop 

The close() method of the Python asyncio library closes the event loop (named loop) created earlier once all the tasks are completed. Next, we see how we can further leverage the benefits of tasks in Python asyncio for running tasks at the event loop at the same time.

Tasks

So far, we have seen that the await keyword is used to suspend the execution of the current coroutine until the execution of the awaitable (could be coroutines, tasks, or futures) is complete. Hence, it is used for cooperative multitasking, whereby multiple coroutines can run parallel in a single-thread environment. However, tasks are a more efficient way of managing the concurrent execution of coroutines. 

A task is a wrapper around a coroutine that is scheduled for execution by the event loop. Though both sound very similar, there is a thin line of difference when you opt for tasks instead of merely using the await keyword. 

As seen in the earlier example for Event Loop, the current coroutine (e.g., test_1()) yields the control back to the event loop once the await keyword is encountered in the coroutine. As soon as this occurs, the current coroutine is temporarily paused (or suspended), and the event loop picks up the next available coroutine/Task/Future (e.g., test_2()) for execution. Once the execution of the awaited coroutine (i.e., test_2()) is complete, the suspended coroutine (i.e., test_1()) is resumed for execution. 

However, tasks let you manage concurrent execution of the coroutines more efficiently. Unlike coroutines that are temporarily paused using await, tasks can start coroutines without waiting for them to complete. Tasks in the Python asyncio library let you run/schedule multiple coroutines concurrently on a single thread. 

Let’s consider the earlier example where we have two coroutines: test_1() and test_2(). When converted into tasks, both tasks can run concurrently instead of one waiting for the other to complete execution. In a nutshell, tasks wrapping coroutines test_1() and test_2() not only run in tandem, but the event loop can also utilize the CPU more efficiently.

Coroutines can be wrapped into a task by invoking the create_task() method, which returns a Task object. As stated in the official documentation, the wait_for() method of the Python asyncio library waits for the single Future (or coroutine) to complete with a timeout. 

On a lighter note, tasks and await in the Python asyncio library are two sides of the same coin. With await, control is transferred back to the event loop that schedules the next awaitable to be run. Though it does provide concurrency, there is still massive room for performance improvement. 

This is where tasks come into the picture, as this wrapper around a coroutine lets you concurrently run multiple coroutines. The usage of tasks ups the overall efficiency of the Event Loop, thereby improving the performance and responsiveness of the code. The example shown in the Event Loop section is ported in a manner where normal coroutines are converted into Tasks.

Python
 
import asyncio
import sys
import time
from datetime import datetime


async def test_1():
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   await asyncio.sleep(2)
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "test_1"


async def test_2():
   print("Enter " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   await asyncio.sleep(2)
   print("Exit " + sys._getframe().f_code.co_name)
   return "test_2"


async def main():
   print("Enter main")
   start_time = time.perf_counter()


   # Create tasks for concurrent execution
   task1 = asyncio.create_task(test_1())
   task2 = asyncio.create_task(test_2())


   # Await both tasks
   ret_info_1 = await task1
   print(f"Data received from test_1: {ret_info_1} " + str(datetime.now().time()))
   ret_info_2 = await task2
   print(f"Data received from test_2: {ret_info_2} " + str(datetime.now().time()))


   print("Exit main")
   end_time = time.perf_counter()
   print(f'It took {round(end_time - start_time, 0)} second(s) to complete.')


if __name__ == '__main__':
   # Run the main coroutine
   asyncio.run(main())


There are two simple coroutines test_1() and test_2(), where a sleep of 2 seconds is added in each of them. In main(), the two coroutines are wrapped as tasks by invoking the create_task(co_name) method of the Python asyncio library. Now that the tasks are created, the await keyword (on the task wrapping test_1()) pauses the current coroutine and schedules the other task (wrapping test_2()).Both tasks are scheduled to run instantly on the event loop at the same time. The event loop can schedule other tasks during the sleep period. The execution time is 2 seconds (which was earlier 4 seconds) since task2 runs concurrently with task1. 

Concurrent execution of test_1() and test_2() in Python asyncio tasksCode execution of coroutines in asyncio event loop with sleep function 

Tasks should be prioritized over normal coroutines if there is no dependency (e.g., result of one coroutine used in the other one) between coroutines. Invoking multiple API requests (e.g., LambdaTest APIs) and clubbing the results, performing parallel I/O operations, logging data, and running background tasks are some of the scenarios where tasks should be preferred over coroutines. On the whole, concurrent execution of tasks reduces the overall execution time along with improving the efficiency of the code.

Running Tasks Concurrently Using asyncio.gather() 

There could be scenarios where you would want a series of awaitables (e.g., tasks) to be executed concurrently, with the result being an aggregate list of the returned values. One such example is testing multiple API endpoints in a single go where the API response is aggregated in a list. Similarly, batch processing (i.e., extracting and processing data concurrently) is much more efficient using the asyncio.gather() method. As stated in the official documentation, asyncio.gather() lets you run awaitable objects in the aws sequence concurrently.

awaitable asyncio.gather(*aws, return_exceptions=False)


All coroutines (i.e., aws) are automatically converted into tasks. The coroutines passed to the gather() method are executed concurrently, and the results are retrieved after the completion of all the coroutines. In case two coroutines (e.g., coroutine_1() and coroutine_2()) are passed to the gather() method, they are first converted into tasks and executed concurrently. The return value is a list of execution results in the order of the original sequence, not necessarily the order of results arrival. return_exceptions parameter, which is False by default, lets you tweak the manner in which raised exceptions are handled during the gather operation. We will cover return_exceptions in more detail in the further sections of this Python asyncio tutorial. Shown below is a port of the create_task() example which we demoed earlier. The two coroutines, coroutine_1() and coroutine_1() are inherently converted into tasks by the asyncio.gather() method.

Python
 
import asyncio
import sys
import time
from datetime import datetime


async def coroutine_1():
   print("Enter asyncio.gather " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   await asyncio.sleep(2)
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "coroutine_1"


async def coroutine_2():
   print("Enter asyncio.gather " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   await asyncio.sleep(2)
   print("After sleep " + sys._getframe().f_code.co_name + " " + str(datetime.now().time()))
   return "coroutine_2"


async def main():
   print("Enter main")
   start_time = time.perf_counter()
  
   # Use asyncio.gather to run test_1 and test_2 concurrently
   ret_info_1, ret_info_2 = await asyncio.gather(
       coroutine_1(),
       coroutine_2(),
       return_exceptions = True
   )
  
   print(f"[asyncio.gather] Data received from coroutine_1: {ret_info_1} " + str(datetime.now().time()))
   print(f"[asyncio.gather] Data received from coroutine_2: {ret_info_2} " + str(datetime.now().time()))
   print("Exit main")
  
   end_time = time.perf_counter()
   print(f'It took {round(end_time - start_time, 0)} second(s) to complete.')


if __name__ == '__main__':
   asyncio.run(main())


The coroutines/tasks are scheduled and run concurrently by the gather() method of the Python asyncio library. The return value of the task(s) execution is captured in variables ret_info_1 and ret_info_2, respectively. Apart from gathering tasks, the rest of the execution and test logic remains unchanged. The total execution time is 2 seconds even though the execution of both the concurrently-running coroutines was paused for 2 seconds using the asyncio.sleep() method. 

Asyncio coroutines running concurrently with gather method 

To summarize, gather() in Python, asyncio helps improve performance by reducing wait times, provides robust error handling, and improves result collection in a list. 

Handling Exceptions in asyncio.gather() 

Exceptions could occur during the execution; either it could be an exception raised in one or more tasks, or it could be a task(s) that was canceled, raising the CancelledError exception. The manner in which exceptions are handled is controlled by the return_exceptions parameter in gather(). By default, return_exceptions is False, hence, any raised exception is propagated to the task that awaits on the gather(). 

In the snippet below, a list consisting of four coroutines is passed to gather(). Since return_exceptions is set to False, exceptions raised by any coroutine/task would be propagated to the next one, resulting in a cancellation of the said tasks (and other future awaitables). In case an exception is raised in coroutine_1, it will be propagated to other tasks, thereby canceling all of them (i.e., coroutine_2, coroutine_3, and coroutine_4). If return_exceptions is True, all the tasks (or coroutines) complete the execution, even if one or more tasks raise an unhandled exception. 

Exceptions raised, if any, are provided as a return value in the results list returned from gather(). Like the earlier scenario, coroutine_1 raises an exception. Since this time around return_exceptions is set to True, the assertion raised by coroutine_1 will be added to the return list, and all the other awaitables (i.e., coroutine_2, coroutine_3, and coroutine_4) will complete their execution. 

Python gather() function handling exceptions example 

In the example below, we have four coroutines that are passed as a list to the asyncio.gather() method. The coroutine_1() throws the ValueError exception, whereas the coroutine_2() throws the SystemError exception.

Python
 
import asyncio
import sys


async def coroutine_1():
   await asyncio.sleep(2)
   raise ValueError(sys._getframe().f_code.co_name + " failed with ValueError")
   return("coroutine_1 finished.")


async def coroutine_2():
   await asyncio.sleep(2)
   return("coroutine_2 finished.")


async def coroutine_3():
   await asyncio.sleep(2)
   raise SystemError(sys._getframe().f_code.co_name + " failed with SystemError")
   return("coroutine_3 finished.")


async def coroutine_4():
   await asyncio.sleep(2)
   return("coroutine_4 finished.")


async def main(return_exceptions_val):
   try:
       results = await asyncio.gather(
           coroutine_1(), coroutine_2(), coroutine_3(), coroutine_4(),
           return_exceptions = return_exceptions_val
       )
       print(results)
   except ValueError as e:
       print("Value Error raised.")


print("Running with return_exceptions = False")
asyncio.run(main(return_exceptions_val = False))


print("\nRunning with return_exceptions = True")
asyncio.run(main(return_exceptions_val = True))


When return_exceptions in the gather() method is set to False, coroutine_1 raises ValueError, and all other tasks on the list: coroutine_2, coroutine_3, and coroutine_4 are canceled as the execution value is propagated to the tasks. The execution story is entirely different when return_exceptions is set to True. Here, all four coroutines complete the execution even though ValueError and SystemError are raised by coroutine_1 and coroutine_3, respectively. 

ValueError & SystemError 

With return_exceptions set to True, exceptions raised are provided as a return value in the list returned from the gather() method of the Python asyncio library.

Async/Await in Asyncio

We have used the async/await combination extensively throughout this Python asyncio tutorial. To put it in simple terms, async/await are the guiding pillars for realizing concurrent code execution using the Python asyncio library. The async keyword converts a Python function into a coroutine, whereby the coroutine can be executed asynchronously. The async keyword returns a coroutine object that is run by the event loop. What this essentially means is that the coroutine can momentarily pause its execution under the following circumstances:

  • Waiting for I/O operations - making network requests, interacting with databases, and more.
  • Waiting for external events - specific test conditions before proceeding with actions, monitoring and logging server-side issues, and more.
  • Achieving better concurrency - yielding control to the event loop when there are waits (or sleep), running multiple coroutines using asyncio.gather().

Coroutines can pause their execution using the await keyword. The await keyword suspends the currently executing coroutine, and the control is yielded to the event loop. The suspend coroutine/task is again scheduled for execution when the awaitables (i.e. I/O, timer expiry, etc.) completes. With the current task suspended, the event loop now schedules and executes coroutines that are ready for execution. Once the awaited task is completed, the earlier suspended coroutine resumes execution from the point where it was paused. Now that we have covered the major aspects of the Python asyncio library let’s look at an example that showcases the usage of async/await keywords in Python:

Python
 
# Demonstration of asyncio with Python (Pytest is the automation framework)


# Includes the following:


# Usage of aiohttp
# Usage of asyncio.gather
# Marking tests as async using the @pytest.mark.asyncio marker


import pytest
import aiohttp
import asyncio
import json
import ssl
import os
import sys
from dotenv import load_dotenv
import certifi


load_dotenv()


user_name = os.getenv('LT_USERNAME')
api_key = os.getenv('LT_ACCESS_KEY')


# Inspiration - https://stackoverflow.com/questions/53199248/get-json-using-python-and-asyncio
async def get_top_reddit_threads(subreddit, session):
   url = f"https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=20"


   # Reference JSON - https://www.reddit.com/r/Playwright/top.json?sort=top&t=day&limit=20
   data = await get_json(session, url)


   if data:
       data_decoded = json.loads(data.decode('utf-8'))
       print(f'\nReddit details for {subreddit}')
       print(f'____________________________\n')
       for post in data_decoded['data']['children']:
           score = post['data']['score']
           title = post['data']['title']
           link = post['data']['url']
           if score and title and link:
               print(f'Score: {score}  |  Title: {title}  |  Link: ({link})')


# Fetch JSON data from a URL


async def get_json(session, url):
   headers = {"accept": "application/json"}
   try:
       async with session.get(url, headers=headers) as response:
           # Response 200 - We have the data!
           assert response.status == 200
           return await response.read()
   except aiohttp.client_exceptions.ClientConnectorCertificateError as e:
       print(f"SSL Certificate Error: {e}")
       return None
   except Exception as e:
       print(f"Error fetching data: {e}")
       return None


# Refer LambdaTest API documentation - https://www.lambdatest.com/support/api-doc/


async def get_lambdatest_sessions(session):
   url = f"https://{user_name}:{api_key}@api.lambdatest.com/automation/api/v1/sessions?limit=40"
   data = await get_json(session, url)


   if data:
       data_decoded = json.loads(data.decode('utf-8'))
       for test in data_decoded['data']:
           test_id = test['test_id']
           build_name = test['build_name']
           status_ind = test['status_ind']
           print(f"Build: {build_name}  |  ID: {test_id}  |  Status: {status_ind}")         


@pytest.mark.asyncio
async def test_fetch_lambdatest_sessions():
   ssl_context = ssl.create_default_context(cafile=certifi.where())
   async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
       await get_lambdatest_sessions(session)


@pytest.mark.asyncio
async def test_fetch_reddit_threads():
   ssl_context = ssl.create_default_context(cafile=certifi.where())
   async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
       subreddits = ['Selenium', 'Playwright', 'Python', 'asyncio']
       tasks = [get_top_reddit_threads(subreddit, session) for subreddit in subreddits]
       # Gather the tasks using gather() method of asyncio
       await asyncio.gather(*tasks)


In the above example, we have two async functions:

Test - Async Function (Coroutine) Purpose
test_fetch_lambdatest_sessions() Fetches details of the sessions of the tests executed on LambdaTest.
test_fetch_reddit_threads() Fetches top Reddit threads matching certain topics using the API provided by Reddit.


Let’s deep dive into the integral aspects of the code: To get started, we import all the essential libraries and modules: pytest, asyncio, aiohttp, and others that are used for asynchronous programming, test execution, and more. Since the LambdaTest and Reddit APIs provide output in the JSON format, hence the json module is also imported into the code. Since LambdaTest APIs are used in the tests, the user name and access key obtained from the LambdaTest Profile > Password and Security section are exposed as LT_USERNAME and LT_ACCESS_KEY environment variables, respectively. 

There are the async functions (or coroutines) in the example, of which two are helper functions used by the respective tests:

1. Coroutine 1 (Helper): get_json() 

The above coroutine fetches the data in a JSON format using the respective URL. First, we create a dictionary containing the accept header, which specifies that the client expects a response in the JSON format. 

Next up, an asynchronous GET request to the specified URL (i.e., LambdaTest URL or Reddit API), including the headers. The status variable of the response object indicates whether the request was successful or not. You can also get HTTP status code via Apache Client. Any response other than 200 means that the client's request to the server was unsuccessful. The read method of the response object returns the content of the response. The try…except loop is used to catch or handle exceptions.

2. Coroutine 2 (Helper): get_lambdatest_sessions() 

This particular coroutine returns the meta-data related to test sessions for the tests executed on the LambdaTest platform. We are using the LambdaTest REST API to fetch session details for the last 40 tests executed on LambdaTest. 

In order to use LambdaTest, you need to create an account and export the environment variables LT_USERNAME and LT_ACCESS_KEY mentioned earlier. Since we need session information for only the last 40 sessions, the limit parameter in the LambdaTest sessions API is set to 40. LambdaTest user name and access key are appended to the URL for authentication purposes. 

Next, the HTTP session object and constructed URL are supplied as input parameters to the get_session() coroutine that we discussed earlier. The get_session() coroutine returns the LambdaTest session meta-data in the JSON format. Once we have the byte-encoded data, it is then decoded into a string of UTF-8 format. The loads() method of the json module parses the JSON string into a Python dictionary. Once we have the data in the JSON-decoded dictionary, we loop through each item in the data key. 

required url 

As seen from the API response, each test session comprises a unique test_id, a build_name, and a status_ind indicator. All these respective entries for each test session are printed on the console.

3. Coroutine 3 (Helper): get_top_reddit_threads() 

This helper function provides the top Reddit topics for a particular subreddit (e.g., 'Selenium', 'Playwright', 'Python', 'asyncio'). The subreddit is passed as an argument to the get_top_reddit_threads() coroutine. Like the other helper functions, we first construct the URL along with the query parameters sort=top, t=day, and limit=20 to get the top 20 posts of the day. 

The get_json() helper is invoked here as well in order to perform the GET request on the URL created in the earlier step. The byte-encoded response data is then decoded into a string of UTF-8 format. The loads() method parses the JSON string into a Python dictionary. Now that we have the decoded JSON, we loop through each item in the children list, which is under the data key. Each item represents a Reddit post. For testing, just head over to the Python subreddit URL (for Python) and it is seen that entries score, title, and link are present in the children list, which is under the data key. 

reddit 

The meta-data (i.e., score, title, and sub-reddit link) are finally printed on the console. With the helper functions all covered, let’s look at the test functions/coroutines used in the example:

Coroutine 1 (Test Function): test_fetch_lambdatest_sessions() 

Since this is an asynchronous test function (or coroutine), it is marked with the @pytest.mark.asyncio decorator. With this, the test_fetch_lambdatest_sessions() function is executed as an asyncio task in the event loop provided by pytest-asyncio. 

Next, a default SSL context is created using the create_default_context() method of the SSL library. During the implementation, we came across a few errors related to SSL verification, and the Stack Overflow thread on SSL in Python helped resolve those errors. 

The path to the system CA certificate bundle is provided via the certifi.where() method. The certificate is then passed to the SSL context to ensure a secure connection. 

ClientSession is the entry point for all the client API operations. Since we are using custom SSL parameters, an ssl.SSLContext instance is created and used for the entire session with ClientSession(connector=TCPConnector(ssl=ssl_context)). The connector parameter in ClientSession is set to aiohttp.TCPConnector with the ssl parameter set to ssl_context. With this, the session uses the SSL context (ssl_context) for secure connections. 

Finally, the helper function/coroutine get_lambdatest_sessions() is invoked with the newly created session passed as a parameter to it. As seen from the example, we are creating a new session object for each test, this could have been further optimized by using pytest fixtures with asyncio. 

Coroutine 2 (Test Function): test_fetch_reddit_threads() 

Most of the implementation of test_fetch_reddit_threads() remains the same as that of test_fetch_lambdatest_sessions(), barring a few changes. First, we create a list of all the subreddits (i.e., Selenium, Playwright, Python, and asyncio) whose top threads need to be fetched. Next up, we create a list of tasks/coroutine objects by invoking the helper get_top_reddit_threads() for each subreddit in the list. The session object that was created in the earlier step is passed in the helper function along with the subreddit. All four tasks run concurrently via the gather() method of the Python asyncio library. The return_exceptions parameter in asyncio.gather() is set to False, which means that an exception raised by any task will result in cancellation of the other tasks/awaitables. 

Execution 

Invoke the following command on the terminal to execute the tests: pytest --verbose --capture=no tests/sample-examples/5_async_await.py As seen in the execution snapshot, both the tests executed successfully, and the top 20 LambdaTest test sessions and the top 40 Reddit threads are printed on the console. 

LambdaTest test sessionsresults


To summarize, the combination of async/await, tasks, and more can be leveraged to handle concurrent execution and free up the CPU when I/O bound operations, network requests, etc. are in progress. All of this allows the application to remain responsive while accelerating the speed of test execution.

How to Make Python Code Asynchronous?

Before getting into how to make the code asynchronous, it is important to identify the scenarios for which async should be preferred over the sync mode. Here are some considerations that should be taken into account when opting for asynchronous code in Python:

Identify Operations Involving External Resources

Until now, it is very evident that asynchronous execution is the way to go when the application involves operations with external resources: network requests, database queries, I/O, etc. In such scenarios, the CPU would be less loaded, which would allow it to pick up other tasks that require its attention. Here, we are not referring to CPU-bound tasks for which Python asyncio has to be integrated with executors to improve application responsiveness and performance. For CPU-bound tasks or blocking I/O, Python asyncio can be used with ThreadPoolExecutor for offloading tasks from the asyncio event loop. Also, Python asyncio with ProcessPoolExecutor offers the benefits of parallelism by making the best use of multi-CPU cores.

Concurrent Test Execution

Secondly, asynchronous execution should be opted for if the application has independent tasks that can be executed concurrently. We looked into one such scenario where we made multiple API calls (i.e., LambdaTest APIs and Reddit APIs) in parallel, and async/await and tasks helped in faster test execution. Libraries like Python asyncio, aiohttp, aiomysql, aiopg, etc., can be leveraged to concurrently execute applications built using the event-driven microservices architecture. 

Concurrent Test Execution

Event-driven async communication


You can also refer to our blog on Microservices Design Principles for a quick refresher on design patterns that best suit your microservices-based application.

Mark Functions as Coroutines

Functions with I/O-bound operations are the ideal contenders for asynchronous execution. These functions can be converted into coroutines using the async/await keywords. 

Mark Functions As Coroutines 

Replacing blocking calls with await allows other tasks to be executed while the awaited operation is in progress. As seen above, two simple tests that sleep for 2 seconds execute concurrently using the gather() method of the Python asyncio library. While the sync equivalent of the above snippet takes 6 seconds to execute, the async variant executes in 4 seconds.

sync equivalent

Sync test execution - 6 seconds


Async test execution

Async test execution - 4 seconds


To summarize, it is recommended to accelerate execution time with asyncio by replacing tasks/functions that involve operations pertaining to I/O, servers, databases, and more.

github


Demonstration: Asyncio in Python

Now that we have covered most of the essential concepts of Python asyncio, let’s dive deep into real-world scenarios. Before doing the same, let’s set up the project and execution environment.

Project Structure

The project structure is where the tests demonstrating the usage of the Python asyncio library are located in the tests folder. Let’s do a deep dive into the project structure:

  • pageobject - Contains locators used in the respective tests. Primarily created for realizing the needs of Page Object Model in Selenium Python.
  • tests/fetching_pokemon_names - Sync and async fetching of Pokemon information using Pokemon APIs.
  • tests/fetching_weather_information - Sync and async fetching of current weather of US cities using OpenWeather APIs.
  • tests/get_automation_builds - Fetching metadata of sessions created for running tests on the LambdaTest Cloud Grid. LambdaTest Sessions API is used to fetch the respective information.
  • tests/url_health_checking - Sync and async implementation for checking the health of links present on LambdaTest Selenium Playground.
  • tests/web_scraping - Scraping of items on LambdaTest eCommerce Playground using sync and async programming in Python.

Apart from the above-mentioned directories, the project also contains the following files in the project’s root directory:

  • conftest.py - Configuration file in pytest used for sharing fixtures, hooks, and other configuration settings across test files in the suite.
  • Makefile - Contains commands used for executing tests in sync and async mode in Python.
  • pyunitsetup.py - Implementation for setting up the browser instance, setUp, and tearDown functions.
  • requirements.txt - Contains a list of packages or libraries (e.g., bs4, aiohttp, etc. required for the implementation and execution.

As stated earlier, it is recommended to have Python 3.4 (or later) since the Python asyncio library is available out of the box in those versions of Python.

Project Prerequisites

It is recommended to use a virtual environment (venv) since it helps in better management of dependencies and environments. In a nutshell, the virtual environment isolates the packages from the base environment. 

Note: Please replace pip3 with pip depending on the pip version installed on your machine. 

Run the commands virtualenv venv and source venv/bin/activate on the terminal to create the virtual environment. Now that the virtual environment is ready let’s install the required libraries present in requirements.txt. Invoke the command pip3 install -r requirements.txt on the terminal. Here is the list of libraries that we have installed for the demonstration:

Library Description GitHub Link
pytest-xdist Helps realize parallel test execution with pytest https://pypi.org/project/pytest-xdist/
pytest-asyncio Provides support for coroutines as test functions. https://pypi.org/project/pytest-asyncio/
requests HTTP library used for making HTTP requests in a synchronous manner https://pypi.org/project/requests/
pytest-order Allows customization of the order in which the tests in Pytest are executed https://pypi.org/project/pytest-order/
bs4 Allows scraping of information from HTML and XML documents https://pypi.org/project/beautifulsoup4/
aiohttp Asynchronous HTTP client/server framework. Used for accelerated web scraping in Python https://pypi.org/project/aiohttp/
python-dotenv Reads key-value pairs from a .env file https://pypi.org/project/python-dotenv/


In the interest of time, we will be deep-diving into the following scenarios in the further sections of this Python asyncio tutorial:

  • Web scraping
  • URL health checking
  • Fetching weather information
  • Getting session details on LambdaTest

We will be benchmarking async with sync using the Hyperfine command-line tool. In case you are using macOS, run the common brew install hyperfine on the terminal for installing Hyperfine in the execution environment. At the time of writing this Python asyncio tutorial, the latest version of Hyperfine is 1.18.0. Run the command hyperfine --help in case you need more information about the usage of Hyperfine.

Asynchronous Web Scraping in Python

Web scraping in Python is one of the popular use cases where libraries like requests, BeautifulSoup (bs4), etc., can be leveraged to scrape information from a document. 

In this Python web scraping repo, I have used the synchronous approach to scraping. Though synchronous web scraping with requests and bs4 does the job well, it might falter in performance (or scraping time) if information has to be scraped from a large number of pages. 

We will be scraping content from LambdaTest eCommerce Playground like it is done in the repo where we have used bs4 and requests for sync web scraping. Since we will be benchmarking the sync vs. async performance, we first scrap content on the eCommerce Playground using the sync approach (using bs4 and requests libraries in Python). 

Implementation (Synchronous Web Scraping in Python)

Implementation


For simplification, we will be porting the sync code to its async equivalent by doing the following modifications:

Porting Sync Implementation to Asyncio
  1. The requests library is replaced with aiohttp - asynchronous HTTP Client/Server for asyncio and Python. It will be useful in fetching information in a concurrent manner.
  2. All the functions are now converted into coroutines using the async keyword
  3. The combination of async/await is used to ensure that coroutines are not blocked when we are making HTTP requests or an I/O operation is in progress.
  4. Multiple pages are fetched together using the gather() method of the Python asyncio library.
  5. Finally, we use the asyncio.run() function to run the main function asynchronously.


Though BeautifulSoup/bs4 is not recommended due to its synchronous nature, we are using it to simply parse the HTML content from the eCommerce Playground. 

Implementation (Asynchronous Web Scraping in Python)

Python
 
import asyncio
import aiohttp
import sys
import ssl
import certifi
import time
from pprint import pprint
from bs4 import BeautifulSoup


sys.path.append(sys.path[0] + "/../../")


from pageobject.locators import locators
from pageobject.helpers import helpers


async def fetch(url, session):
   async with session.get(url) as response:
       return await response.text()


# Encountered the below error
# aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to
# host ecommerce-playground.lambdatest.io:443 ssl:True
# [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed:
# unable to get local issuer certificate (_ssl.c:1000)')]


# Solution: https://stackoverflow.com/a/66842057/126105


async def scrap_ecommerce(url):
   ssl_context = ssl.create_default_context(cafile=certifi.where())
   async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
       html = await fetch(url, session)
       soup = BeautifulSoup(html, 'html.parser')


       rows = soup.select('.product-layout.product-grid.no-desc.col-xl-4.col-lg-4.col-md-4.col-sm-6.col-6')
       meta_data_arr = []


       for row in rows:
           link = row.find("a", class_='carousel d-block slide')
           name = row.find("h4", class_='title')
           price = row.find("span", class_='price-new')


           meta_data_dict = {
               'product link': link.get('href'),
               'product name': name.get_text(),
               'product price': price.get_text()
           }


           meta_data_arr.append(meta_data_dict)


       return meta_data_arr


async def main():
   start_time = time.time()
   base_url = locators.test_bs4_url
   tasks = [scrap_ecommerce(f"{base_url}&page={i}") for i in range(1, 6)]
   results = await asyncio.gather(*tasks)


   for i, result in enumerate(results, 1):
       print(f"Product Page = {base_url}&page={i}")
       print("*********************************************************************************************************")
       helpers.print_scrapped_content(result)
       print()
  
   print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")


if __name__ == '__main__':
   output = asyncio.run(main())


To get started, we first import all the required libraries in the project. We came across aiohttp.client_exceptions.ClientConnectorCertificateError is the solution that we discovered in this Stack Overflow thread. Hence, the certifi library that validates the trustworthiness of SSL certificates is imported into the code. The aiohttp and bs4 libraries are also imported for asynchronous HTTP communication with Python asyncio and parsing HTML content, respectively. 

Like the earlier example of sync web scraping, the locators are separated from the core implementation to make the ideal use of the Page Object Model in Python. On similar lines, helpers.py contains the helper functions that would be used in the tests. All the functions in the async-based implementation are marked with the async def keyword. 

The scrap_ecommerce() coroutine does the scraping of the content from the LambdaTest E-Commerce Playground. The URL to be scraped is passed as a parameter to the coroutine. As mentioned earlier, we encountered SSL certificate verification errors. To counter the same, we have created a custom SSL context that includes a trusted certificate authority (CA) bundle. It is used to verify SSL certificates. 

The create_default_context() method of the SSL library creates a new SSL context with the default settings. certifi.where() returns the path to the CA bundle, a file that contains root and intermediate certificates. The custom SSL context (i.e., ssl_context) will be used in further sections of the code. 

The ssl_context created in the earlier step is now passed to the TCPConnector() method of the aiohttp library for creating a ClientSession (named session). HTTP requests made within the session are always secure with the trusted CA bundle. 

The fetch_url() coroutine is a helper that takes the URL to be scraped and the currently active session as the input parameters. It asynchronously fetches data (with async HTTP GET request) from the specified URL using the aiohttp library. With await response.text(), completion of reading the response content is awaited, and the same is converted into HTML content (of string format). BeautifulSoup(html, ‘html.parser’) returns a BeautifulSoup object that is used further for scraping. 

As mentioned earlier, this is not a time-consuming operation; hence, bs4 is used in asynchronous web scraping. The select() method of BeautifulSoup (bs4) finds elements using the CSS Selector property: .product-layout.product-grid.no-desc.col-xl-4.col-lg-4.col-md-4.col-sm-6.col-6. 

As seen below, 15 elements match the CSS Selector, as there are 15 products on the said product page. The method returns a list that is used later for scraping meta-data (i.e., name, price, description, etc.) of every product on the page. 

product line 

It is important to note that the same logic is also used for synchronous web scraping. A loop is run for scraping information of all the 15 products (or elements) under the div located in the earlier step. The product link is obtained by locating the element using the find() method of bs4. The first argument is the tag that needs to be searched for (i.e., ‘a’ – anchor tag), and the second is the CSS Class attribute. 

Along similar lines, the price of the product/element is obtained by locating the element using the find() method of bs4 along with the Class selector (i.e., price_new). The get_text() method of bs4 provides the product name and price when used with the respective elements. 

Every dictionary entry (i.e., meta_data_dict representing product name, product link, name, and product price is finally appended to a list in Python. Since we have to scrap product information from Page 1 through Page 5, we first create tasks for scraping information from the said pages in a concurrent fashion. 

The tasks created in the earlier step are run concurrently by invoking the gather() method of the Python asyncio library. The return_exceptions parameter in the method is set to FALSE (by default), which means that failure in scraping content from any page would result in the cancellation of other tasks. 

The final result is available in the results list. 

tasks

Asynchronous scraping (using gather)



It is the above step that leverages the advantages offered by tasks in Python asyncio and aiohttp library for concurrent scraping of multiple pages (Page 1 through Page 6). Shown below is its sync equivalent, where the scrap_commerce() method is called for every page but in a synchronous manner. 

synchronous manner

Synchronous scraping





Now that the content is scraped, we invoke the helper print_scrapped_content() for printing scraped content on the console. The asyncio.run() is invoked to execute the main() coroutine synchronously.

Benchmarking: Sync and Async Web Scraping 

Invoke the command make perform-web-scraping for benchmarking the scraping use case using the Hyperfine command-line utility. The warmup option in Hyperfine is set to 3. Hence, the actual benchmarking starts after three warm-up runs. The show-output option in Hyperfine shows the command output/execution on the terminal. 

Benchmarking


As seen from the benchmarking results, asynchronous web scraping is close to 2.93 times faster than its synchronous counterpart. Though the number looks a tad smaller, it is something that can make a huge performance impact when used in scraping a large number of documents (or pages).

FastAPI With Asyncio for High-Performance APIs

FastAPI is a popular modern and high-performance web framework that is used to build APIs with Python. The framework is designed to optimize the overall developer experience so that you can build production-ready APIs while keeping the best practices in mind. As stated in the FastAPI official documentation, FastAPI's performance is on par with that of NodeJS and Go. It is built on open standards for APIs, i.e., OpenAPI (earlier known as Swagger) and JSON Schema. Like other Python frameworks, FastAPI also increases developer productivity and minimizes duplication of code. 

FastAPI, when combined with Python asyncio results in improved throughput and faster response times, as Python asyncio helps better handle simultaneous incoming requests. This makes the application more scalable, as Python asyncio effectively manages the system resources. With Python asyncio, you can build high-performance and low-latency APIs that are capable of handling high loads with ease. 

To install FastAPI, you need to trigger pip3 install fastapi uvicorn on the terminal. uvicorn is the server that will use the API you build to serve requests. 

pip3 install fastapi uvicorn 

At the time of writing this Python asyncio tutorial, the latest versions of FastAPI and uvicorn are 0.112.2 and 0.30.6 respectively. 

latest versions of FastAPI & uvicorn  

It is recommended to separate the tests from the core application logic, very similar to what we normally do in the Page Object Model in Python. For FastAPI, the core application logic (i.e., routines, dependencies, configurations, etc.) is normally placed in the app folder, whereas the test logic is placed in the tests folder. However, you can have a different directory structure depending on the project requirements. In a nutshell, opt for a directory structure that makes the code more maintainable and scalable in the long run. 

Demonstration: FastAPI With Asyncio 

In order to demonstrate the capabilities of FastAPI with Python asyncio, we will develop a simple FastAPI application that interacts with the LambdaTest APIs. The APIs are used for fetching details associated with the builds and sessions executed from my account. The core application logic is in app/main.py, and the tests that use the application logic are placed in tests/test_main.py. The APIs that fetch the build and session information asynchronously are located in tests/fastAPI/app/main.py.

Let’s look at some of the most important aspects of the code! First, we import the FastAPI class, which is used for creating the FastAPI application. An instance of that class is normally assigned to a variable (e.g., app). The newly created app object is used to define the web application, including endpoints, configurations, and more.

As we are fetching information from LambdaTest using LambdaTest REST APIs, the username and access key are read from the environment variables LT_USERNAME and LT_ACCESS_KEY. As stated earlier, an object (named app) of the FastAPI class is created using app = FastAPI().

As seen in the implementation, the @app.get("/") decorator is used to define a route/endpoint that responds to GET requests at the root URL ("/"). In our case, it is @app.get("/builds/") and @app.get("/sessions/"). get_lambdatest_all_builds() and get_lambdatest_all_sessions() are two asynchronous methods for fetching build- and session-related information, respectively.

As seen below, we first construct the URL with a combination of user_name, access_key, and the LambdaTest API. The limit parameter is set to 50 so that details about the first 50 builds are fetched with the API. The request headers are set up to specify that the client expects a JSON response from the server.

Next, an asynchronous context manager (async with) is used to make a GET request to the LambdaTest API with the session object. It is an instance of an HTTP client session (e.g., aiohttp.ClientSession). The status code of the response object is checked. If the request is successful, the status of the response is 200. An exception is raised, and the error message returned by the server is printed if the execution is not successful.

As stated in the FastAPI official documentation, the @app.get("/builds") route decorator tells FastAPI that the function right below it is responsible for handling requests that go to the path /builds/ using a GET operation. First, an asynchronous HTTP client session (ClientSession) is created using the aiohttp library.

As seen earlier, the ClientSession object is used to manage HTTP requests within the session. The session object is passed to the get_lambdatest_all_builds() method. It returns data in a JSON format if the LambdaTest API returns build information. The data field is extracted from the JSON response (i.e., builds_data). An empty list is returned if the data field does not exist.

The uvicorn.run(app, host="0.0.0.0", port=8000) command runs the FastAPI application (i.e., app) that was created earlier using the app = FastAPI() method. The argument host="0.0.0.0" tells Uvicorn to listen to all available IP addresses. The other argument, port=8000, specifies the port on which the server should listen for incoming requests.

Now that we have covered the methods that are part of the business logic, let’s look at the test methods used for testing the APIs implemented earlier. The test methods are part of tests/test_main.py.

We import the TestClient class from the fastapi.testclient module. TestClient helps simulate requests to the FastAPI application and receive responses from the application. After the app directory (i.e., tests/app) is added to the system path, the FastAPI application instance/object (app) is imported from the module named main.

Next, we create a TestClient for the FastAPI application by invoking client = TestClient(app). In the application code, we created two routes/endpoints: builds and sessions. Here in the test code, we simulate an HTTP GET request to the respective endpoints (builds and sessions).

If the status of the response is OK (i.e., 200), we parse the body of the HTTP response as JSON. A Python assert is raised if the value associated with the dashboard_urls key is not of type list.

With this, we are all set to execute the FastAPI Python asyncio tests. The tests will run on the LambdaTest platform. It is an AI-powered test execution platform that allows developers and testers to run Python automated tests at scale across various operating systems and web browsers online.

After exporting the environment variables LT_USERNAME and LT_ACCESS_KEY, run the command make fast-api-asyncio on the terminal to execute the tests implemented in tests/test_main.py. As seen below, the details of the builds and sessions associated with my LambdaTest account are printed successfully.

details of the builds and sessions associated with my LambdaTest accountdetails of the builds and sessions associated with my LambdaTest account


What we have demonstrated here is just scratching the surface as far as FastAPI with Python asyncio is concerned. Their combination can be harnessed to develop high-performing, scalable, efficient, and fast web applications.

Asynchronous URL Health Check in Python

One of the popular use cases of Python asyncio is checking the health of a web service (or endpoint) by periodically analyzing responses received from requests sent to a specified URL. It can also be used for API testing, leveraging different HTTP methods (e.g., GET, POST, PUT, DELETE) supported by the respective API. Website monitoring and Service Level Agreements (SLAs) are other prominent use cases of URL health checking with Python.

In all cases, the first and foremost step is to check the availability of the URL and verify whether the response received has a status code of STATUS_OK (or 200). To demonstrate the usage of Python asyncio, we will perform a health check of the URLs present in the LambdaTest Selenium Playground. Similar to the previous example, we will benchmark the performance of synchronous and asynchronous implementations.

Implementation (Synchronous URL Health Checking in Python)

Synchronous URL Health Checking in Python 

First we scrap all the URLs which are stored in the meta_data_arr array. Now that the URLs are available, a for loop that iterates through every URL in meta_data_arr. For each URL, an HTTP GET request is sent by invoking the get() method of the requests library. The received response is stored in a variable status_code. As mentioned earlier, the health of the URL is fine (or is reachable) if the response to GET is 200. For instance, the link to Ajax Form Submit is reachable and should return a response of 200 when it is requested using the GET method. The same principle applies to all the other links present in the Playground. Finally, an assert is raised if the URL is not reachable (or the status code is not 200).

Implementation (Asynchronous URL Health Checking in Python) 

Let’s port the existing code such that it runs asynchronously. Here are the top-level changes in the implementation:

  • [Optional] Marking the tests with the @pytest.mark.asyncio decorator. However, this step is optional as we have already added asyncio_mode = auto in pytest.ini. You can refer to this Stack Overflow thread for more information.
  • Replacing synchronous requests library with asynchronous aiohttp library for performing multiple asynchronous HTTP requests concurrently.
  • Using the gather() method of the Python asyncio library to run multiple tasks concurrently.

Shown below is the complete implementation of Asynchronous URL health checking in Python:

Python
 
import sys
from pageobject.locators import locators
from pageobject.locators import *
from pageobject.helpers import helpers
from pageobject.helpers import *


sys.path.append(sys.path[0] + "/../../")


class TestAsyncHealthCheckOps:
   @pytest.mark.asyncio
   @pytest.mark.run(order=1)
   async def test_async_url_access(self, driver) -> list:
       start_time = time.time()
       meta_data_arr = []
       driver.get(locators.test_playground_url)


       driver.maximize_window()


       meta_data_arr = helpers.scrap_playground_url(driver)


       async def check_status(session, url):
           async with session.get(url) as response:
               status_code = response.status
               print(url + " status = " + str(status_code) + " ")
               return status_code


       async with aiohttp.ClientSession() as session:
           tasks = [check_status(session, url) for url in meta_data_arr]
           status_codes = await asyncio.gather(*tasks)


       for status_code, url in zip(status_codes, meta_data_arr):
           assert status_code == 200, f"Failed for URL: {url}, Status Code: {status_code}"


       print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")


For this scenario, we have used the pytest framework in Python. The execution is performed on a headless Chrome browser since we won’t be interacting with web elements on the page. As we are using pytest, the fixtures and hooks are located in the conftest.py configuration file, which houses the required settings. The environment variable EXEC_ASYNC is set to true for async execution.

As shown below, the to_thread() method of the Python asyncio library is used for creating Chrome browser instances in a separate thread. The method (i.e., driver()) used for creating a browser instance is marked with the pytest fixture in Python with a function scope. Hence, the fixture is invoked for the test methods where it is being used.

Once the headless Chrome instance is created, we first scrape all the URLs present on the LambdaTest Selenium Playground. The helper method scrap_playground_url() returns an array containing the scraped content. Let’s dive into it!

We first locate the entire grid housing the links using the find_element() method in Selenium. The element is located using the XPath Selector:

Python
 
loc_parent_elem = driver.find_element("xpath", "//*[@id='__next']/div/section[2]/div/ul")


The child elements in loc_parent_elem with the class name pt-10 are located using the find_elements() method:

Python
 
loc_list_elems = loc_parent_elem.find_elements("class name", "pt-10")


Like the earlier example, we iterate over each WebElement in the loc_list_elems list. The href attribute of the child element, located using the CSS Selector .text-black.text-size-14.hover\:text-lambda-900.leading-relaxed, contains the link to a page. All the extracted links (final_link) are appended to the meta_data_arr array, which is returned by the helper function.

Now that we have scraped the URLs present on the page, the next step is to check whether the URL is reachable. Before that, we create an asynchronous session (session) using the ClientSession class from the aiohttp library. The session is used to manage the HTTP requests and responses.

The check_status() coroutine takes the session (created earlier) and the target URL as input parameters. The method asynchronously fetches data from the specified URL using the aiohttp library. As shown above, the get() method sends an HTTP request to the URL provided as a parameter. The response object, an instance of aiohttp.ClientResponse, contains the client response. The status attribute provides the HTTP status of the response.

Next, we create a list of tasks to run asynchronously. The list iterates over each URL in meta_data_arr and creates a coroutine for the URL by invoking the check_status() coroutine. The gather() method of Python asyncio runs the tasks asynchronously. Tasks are unpacked and passed as separate arguments to gather().

Upon execution, we have a list of status_codes (or response codes) for each URL.

Finally, we run a for loop that iterates over two lists: the status_codes list and the meta_data_arr list. It asserts whether the status code for the corresponding URL in meta_data_arr is anything other than STATUS_OK (or 200).

Benchmarking: Sync and Async URL Health Checking 

Invoke the check-url-health command to benchmark the specified use case using the Hyperfine command-line utility. The benchmark is conducted after 10 successful runs of both the sync and async implementations. The --show-output option in Hyperfine displays the command output/execution on the terminal. 

check-url-health for benchmarking


As seen from the benchmarking results, asynchronous URL health checking is close to 1.70 times faster than its synchronous counterpart. The impact of async URL health checking will be monumental if the page/document (under test) contains a large number of links.

Asynchronous Weather Check in Python

In this example, we will extract weather information about US cities using OpenWeather APIs. Once you create an account on OpenWeather, you need to copy the Open Weather API key from the API Keys Section. Post this, create an environment variable by invoking the following command on the terminal:

Plain Text
 
export OPEN_WEATHER_API=<API-KEY>


As mentioned in the official OpenWeather documentation, gathering weather information for a particular city is possible via the following API: 

API: Current Weather Data

Plain Text
 
https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={API key}


  • lat: latitude of the location
  • lot: longitude of the location
  • API key: Open Weather API Key

We tried out the API for a latitude and longitude combination, which provided the weather information in a JSON format. 

API for a latitude & longitude combination, 

Though we would be scraping weather data, the difference here is that there is a relatively big data set when compared to the earlier examples. For demonstration, we would first scrape the City Name, Latitude, and Longitude of US cities mentioned in LatLong.net. 

City Name, Latitude, and Longitude of US cities mentioned in LatLong.net. 

Data from Page 1 through Page 13 is scraped and fed to the OpenWeather API for fetching weather data. Like the previous examples, we would be using requests and bs4 for sync weather fetching and bs4 and tasks (in Python asyncio library) for async fetching of weather data. 

Implementation (Synchronous Weather Fetching in Python)

Python
 
# Beautiful Soup Official Documentation - https://www.crummy.com/software/BeautifulSoup/bs4/doc/


# Import the locators file
import sys


sys.path.append(sys.path[0] + "/../../")


from pageobject.locators import locators
from pageobject.locators import *


from pageobject.helpers import helpers
from pageobject.helpers import *
from dotenv import load_dotenv


load_dotenv()


api_key = os.getenv('OPEN_WEATHER_API')


###### Page 1: https://www.latlong.net/category/cities-236-15-1.html ######
start_page = 1
###### Page 13: https://www.latlong.net/category/cities-236-15-13.html ######
last_page = 5


weather_data_arr = []


def scrap_weather_site(url) -> list:
   response = requests.get(url)


   if response.status_code != 200:
       print(f"Unable to fetch the page. Status code: {response.status_code}")
       return None


   soup = BeautifulSoup(response.text, 'html.parser')
   rows = soup.find_all('tr')[1:]


   for row in rows:
       td_tags = row.find_all('td')


       # Extract values/text from all  tags
       td_values = [td.get_text(strip=True) for td in td_tags]
       # print(td_values)
       weather_data_dict = {
           'location': td_values[0],
           'latitude': td_values[1],
           'longitude': td_values[2]
       }
          
       weather_data_arr.append(weather_data_dict)


   return weather_data_arr


def get_weather_info(latitude, longitude):
   # url = f"https://api.openweathermap.org/data/2.5/weather?lat=19.076090&lon=72.877426&appid=ad16be8d5e1200e94e2af3a5f0a321b2"
   url = f"https://api.openweathermap.org/data/2.5/weather?lat=" + str(latitude) + "&lon=" + str(longitude) \
           + "&appid=" + api_key


   try:
       response = requests.get(url)
       response.raise_for_status()
      
       weather_data = response.json()
       return weather_data
   except requests.exceptions.RequestException as e:
       print("Error fetching weather information:", e)
       return None


# Pagination - 1:13
###### Page 1: https://www.latlong.net/category/cities-236-15-1.html ######
###### Page 13: https://www.latlong.net/category/cities-236-15-13.html ######
if __name__ == '__main__':
   start_time = time.time()
   for iteration in range(start_page, last_page):
       # test_weather_url = "https://www.latlong.net/category/cities-236-15
       test_url = locators.test_weather_url + "-" + str(iteration) + ".html"
       meta_data_arr = scrap_weather_site(test_url)
       # print("*****************************************************\n")
       # helpers.print_scrapped_content(meta_data_arr)
      
   for value in meta_data_arr:
       # Extract latitude and longitude
       # Example - {'location': 'Durango, CO, USA', 'latitude': '37.270500', 'longitude': '-107.878700'}
       latitude = value['latitude']
       longitude = value['longitude']
       weather_info = get_weather_info(latitude, longitude)
       if weather_info:
           temperature = weather_info["main"]["temp"]
           city_name = weather_info["name"]
           print(f"Temperature in " + city_name + " is: " + str(temperature))
  
   print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")


Here, we have two primary methods:

  • scrap_weather_site() - Scrap latitude, longitude, and city name from LatLong.net
  • get_weather_info() - Use OpenWeather current data API for fetching weather information using Latitude and Longitude obtained from the earlier step

Since Beautiful Soup (b4) is also used in the async implementation, we will cover the aspects of code walkthrough in that section.

Implementation (Asynchronous Weather Fetching in Python) 

Instead of the requests library, we have used the aiohttp library for asynchronous handling of HTTP requests and responses. Also, asyncio.gather() is leveraged to handle multiple tasks asynchronously. Here is the complete implementation of Asynchronous Weather Fetching in Python:

Python
 
import sys


sys.path.append(sys.path[0] + "/../../")


from pageobject.locators import locators
from pageobject.locators import *


from pageobject.helpers import helpers
from pageobject.helpers import *
from dotenv import load_dotenv


load_dotenv()


api_key = os.getenv('OPEN_WEATHER_API')


start_page = 1
last_page = 5
weather_data_arr = []


async def fetch_data(url, session):
   async with session.get(url) as response:
       if response.status != 200:
           print(f"Unable to fetch the page. Status code: {response.status}")
           return None
       else:
           return await response.text()


async def scrap_weather_site(url, session):
   html_content = await fetch_data(url, session)
   if html_content is not None:
       soup = BeautifulSoup(html_content, 'html.parser')
       rows = soup.find_all('tr')[1:]


       for row in rows:
           td_tags = row.find_all('td')
           td_values = [td.get_text(strip=True) for td in td_tags]
           weather_data_dict = {
               'location': td_values[0],
               'latitude': td_values[1],
               'longitude': td_values[2]
           }
           weather_data_arr.append(weather_data_dict)


async def get_weather_info(session, latitude, longitude):
   url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={api_key}"
   async with session.get(url) as response:
       if response.status != 200:
           print(f"Error fetching weather information for latitude {latitude} and longitude {longitude}.")
           return None
       else:
           weather_info = await response.json()
           return weather_info


async def main():
   # Encountered the below error
   # aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to
   # host ecommerce-playground.lambdatest.io:443 ssl:True
   # [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed:
   # unable to get local issuer certificate (_ssl.c:1000)')]


   # Solution: https://stackoverflow.com/a/66842057/126105
   # async with aiohttp.ClientSession() as session:
   ssl_context = ssl.create_default_context(cafile=certifi.where())
   async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
       tasks = []
       for iteration in range(start_page, last_page):
           test_url = locators.test_weather_url + "-" + str(iteration) + ".html"
           tasks.append(scrap_weather_site(test_url, session))
       await asyncio.gather(*tasks)


       tasks = []
       for value in weather_data_arr:
           latitude = value['latitude']
           longitude = value['longitude']
           tasks.append(get_weather_info(session, latitude, longitude))
       weather_infos = await asyncio.gather(*tasks)


       for weather_info in weather_infos:
           if weather_info:
               temperature = weather_info["main"]["temp"]
               city_name = weather_info["name"]
               print(f"Temperature in {city_name} is: {temperature}")


if __name__ == '__main__':
   start_time = time.time()
   asyncio.run(main())
   print("\nTime elapsed is " + str((time.time() - start_time)) + " seconds")


Like before, the create_default_context() method of the SSL library is used to create a new custom SSL context. It is then passed to the TCPConnector() method of the aiohttp library for creating a ClientSession (named session). This session will be used throughout this example. In the main() function, a for loop is run from 1 through 5 since we would be scraping latitude and longitude information for the first 5 pages on the Latlong website. The page format is shown below:

  • Page 1 - https://www.latlong.net/category/cities-236-15-1.html
  • Page 5 - https://www.latlong.net/category/cities-236-15-5.html

Since we have to scrape content from multiple pages, the scraping is performed asynchronously. The scrap_weather_site() method returns a coroutine object that scrapes content from test_url using the session created in the first step. The coroutine object returned by the method is appended to the tasks list.

Let’s take a quick look at the scrap_weather_site() coroutine. It takes the URL (under test) and the current session as input parameters. First, the fetch_data() function is invoked for fetching the HTML content (response.text()) of the URL provided to the method. Now that we have the HTML document, we parse it using the html.parser parser of BeautifulSoup (bs4). The first row in the table contains the field titles, so they can be skipped during parsing. Therefore, [1:] is added when searching for <tr> elements in the parsed HTML. With this, parsing of rows starts from row number 2.

The find_all() method of bs4 is used for searching all the rows matching the request. A for loop is run, iterating through each row. The <td> element in each row contains the metadata: place name, latitude, and longitude. With td_values = [td.get_text(strip=True) for td in td_tags], the leading (and trailing) spaces are removed from each <td> element. Now that we have the data from every cell, a dictionary (named weather_data_dict) is created with the data. After that, it is appended to the weather_data_arr array.

Now that we have the tasks list, asyncio.gather() is invoked to run all the tasks asynchronously. At this point, we have successfully scraped latitude and longitude from Page 1 through Page 13. Next, a for loop is run over the weather_data_arr, and get_weather_info() is invoked asynchronously. Let’s dive deep into that particular method.

The get_weather_info() method takes the current session, latitude, and longitude as the input parameters. The OpenWeather Current Data API is supplied with the latitude, longitude, and OpenWeather API Key. The response (in a JSON format) to the API is obtained asynchronously by making an asynchronous GET request to the OpenWeather API. The get_weather_info() function, which returns a coroutine object, is executed asynchronously by invoking the tasks.append() method that adds a new task to the list.

Finally, the tasks (or multiple coroutines) are executed asynchronously via the gather() method of the Python asyncio library. The execution results of the coroutines are aggregated into a single list. Like before, return_exceptions is set to False (the default), which means that the execution is halted if any of the coroutines fail. As seen below, the temperature (i.e., temp) key is inside the main dictionary. On similar lines, weather_info["main"]["temp"], the main field with the nested temp field, provides the current temperature of the respective city in the US. The name field in weather_info provides the city name. Finally, the city name and temperature are printed on the terminal.

Benchmarking: Sync and Async Weather Check 

Invoke the command make fetch-sync-weather-info for fetching weather information of the supplied latitude(s) and longitude(s) synchronously. The operation was completed in approximately 325 seconds. 

fetch-sync-weather-info for fetching 

Invoke the command make fetch-async-weather-info for fetching weather information of the supplied latitude and longitude asynchronously using aiohttp and asyncio. The operation was completed in approximately 7 seconds. 

latitude & longitude asynchronously using aiohttp & asyncio


We tried benchmarking with a few more execution cycles and async weather checking was significantly faster than its sync counterpart. 

We have more examples, such as fetching Pokémon names and getting LambdaTest automation session details, available in the GitHub repository. A plugin named pytest-asyncio-cooperative can also be leveraged for cooperative multitasking to run your I/O-bound test suite efficiently and quickly. As stated in the official documentation, the pytest-asyncio plugin is not compatible with the pytest-asyncio-cooperative plugin! Additionally, FastAPI can also be used to improve the efficiency of the Python asyncio library. This combination helps reduce latency when handling requests, resulting in faster response times. Covering FastAPI is beyond the scope of this Python asyncio tutorial, but a separate blog would certainly do justice to the combination of Python asyncio and FastAPI.

It’s a Wrap

Thanks for making it this far; it was definitely a long journey! As covered extensively in the tutorial, Python asyncio offers significant benefits when it comes to concurrent and accelerated test execution. This plays a major role in boosting the application’s performance. If you want to leverage the Python asyncio library for CPU-bound tasks or blocking I/O, we recommend checking out ThreadPoolExecutor. It offloads tasks from the Python asyncio event loop. Prominent use cases, like web scraping and operations involving databases, can benefit from asynchronous programming using the Python asyncio library.

Event loop Python (language) Testing

Published at DZone with permission of Himanshu Sheth. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Mastering Async Context Manager Mocking in Python Tests
  • Supercharging Pytest: Integration With External Tools
  • Exploring the Purpose of Pytest Fixtures: A Practical Guide
  • Automating Python Multi-Version Testing With Tox, Nox and CI/CD

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!