Exploring Python’s asyncio (Asynchronous Programming)Library

Asynchronous programming allows developers to handle tasks that may take some time to complete without blocking the execution of other tasks. Python’s asyncio library is a popular tool for writing asynchronous code that is efficient and clean. Whether you’re working with APIs, managing database connections, or building real-time applications, asyncio can simplify the process.

asyncio is crucial when you have tasks that would otherwise run in series but could benefit from concurrent execution, especially when those tasks are I/O-bound (e.g., file operations, database queries, network requests).

How asyncio Helps Manage Execution Order

The asyncio library allows you to control the flow of execution with features like await, ensuring that one coroutine waits for another to finish before proceeding. If you’ve ever had a function try to use a value or state before it’s ready, asyncio can resolve this by explicitly managing the dependencies between tasks.

This article provides an introduction to asyncio, its core concepts, and real-world examples. You’ll also see how it compares to traditional multithreading and multiprocessing.

Setting Up The Environment

Before diving into the code, create a virtual environment to keep the project dependencies isolated.

On Windows:

python -m venv asyncio_env
.\asyncio_env\Scripts\activate
pip install aiohttp

On Linux:

python3 -m venv asyncio_env
source asyncio_env/bin/activate
pip install aiohttp

What is Asynchronous Programming?

In synchronous programming, tasks are executed one after the other. If one task takes longer, the entire program waits. For example, reading a file or making an API call can block the execution of the next task.

Asynchronous programming solves this by allowing other tasks to run while waiting for slow operations to complete. This is useful when handling I/O-bound tasks like API calls or database queries. With asyncio, you can write code that is non-blocking, making better use of resources and improving performance.

Key asyncio functions in this article:

1. asyncio.gather()

What It Does:

  • Runs multiple async tasks concurrently and collects all their results.
  • Waits for all tasks to complete before returning.
  • If any task raises an exception, asyncio.gather() will raise that exception (unless you set return_exceptions=True).

Use Case:

  • When you need to run multiple tasks and process their results after all tasks have completed.

2. asyncio.as_completed()

What It Does:

  • Runs multiple tasks concurrently and yields their results in the order they complete (not the order they were started).
  • Allows you to process results as soon as tasks finish.

Use Case:

  • When you want to start processing results immediately as they become available, without waiting for all tasks to finish.

Core Features of asyncio

1. Coroutines

A coroutine is a special function defined with async def. These functions can be paused and resumed, allowing other tasks to run in the meantime.

import asyncio

async def greet(name):
    await asyncio.sleep(2)
    print(f"Hello, {name}!")

asyncio.run(greet("Alice"))

In this example, asyncio.sleep(2) simulates a non-blocking delay.

2. Tasks

Tasks allow you to run multiple coroutines concurrently. They’re like lightweight threads managed by the asyncio event loop.

import asyncio

async def say_hello(name, delay):
    await asyncio.sleep(delay)
    print(f"Hello, {name}!")

async def main():
    task1 = asyncio.create_task(say_hello("Alice", 2))
    task2 = asyncio.create_task(say_hello("Bob", 1))
    await task1
    await task2

asyncio.run(main())

The two greetings run in parallel, demonstrating how tasks allow concurrency.

Without asyncio, the two greeting tasks would execute sequentially, meaning the second task would not start until the first one fully completed. This would block the program’s execution during the sleep period of each task, leading to wasted time and inefficiency.

3. Event Loop

The event loop is the heart of asyncio. It schedules and executes tasks, ensuring that each coroutine gets time to run.

You don’t usually interact directly with the event loop, as asyncio.run() handles it for you. However, understanding its role is crucial to writing effective asynchronous programs.

Real-World Examples

Example 1: Making HTTP Requests

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        for i, response in enumerate(responses):
            print(f"Response {i + 1}:", response[:100])  # Print first 100 characters

asyncio.run(main())

This code fetches data from three URLs concurrently, significantly reducing the total execution time. Expected result:

Response 1: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio
Response 2: {
  "userId": 1,
  "id": 2,
  "title": "qui est esse",
  "body": "est rerum tempore vitae\nsequi sin
Response 3: {
  "userId": 1,
  "id": 3,
  "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut"

Simulating the importance of asyncio usage:

Let’s intentionally break the first URL to cause a 404 error and observe how the other two URLs can still retrieve their data. The code below demonstrates how asyncio handles this scenario compared to a synchronous approach, where the broken URL would disrupt or break the entire process.

Note: We use asyncio.as_completed() because it returns an iterator that yields tasks as they finish, no matter the order they started. This helps demonstrate how asyncio processes tasks concurrently.

import asyncio
import aiohttp

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            if response.status != 200:
                return f"Error: HTTP {response.status} for {url}"
            return await response.text()
    except Exception as e:
        return f"Error: {str(e)} for {url}"

async def main():
    urls = [
        "https://jsonplaceholder.typicode.scom/posts/1",  # Invalid URL to simulate 404
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        
        for task in asyncio.as_completed(tasks):
            response = await task
            print(response[:100])  # Print the result or error as soon as it's ready

asyncio.run(main())

Example 2: Database Queries

A script querying multiple databases. Here’s how asyncio handles this scenario.

import sqlite3
import asyncio

# Create and populate the databases
def create_and_populate_databases():
    # Create dbEVEN
    conn_even = sqlite3.connect("dbEVEN.db")
    cursor_even = conn_even.cursor()
    cursor_even.execute("CREATE TABLE IF NOT EXISTS even (number INTEGER)")
    cursor_even.executemany(
        "INSERT INTO even (number) VALUES (?)", [(2,), (4,), (6,), (8,), (10,)]
    )
    conn_even.commit()
    conn_even.close()

    # Create dbODD
    conn_odd = sqlite3.connect("dbODD.db")
    cursor_odd = conn_odd.cursor()
    cursor_odd.execute("CREATE TABLE IF NOT EXISTS odd (number INTEGER)")
    cursor_odd.executemany(
        "INSERT INTO odd (number) VALUES (?)", [(1,), (3,), (5,), (7,), (9,)]
    )
    conn_odd.commit()
    conn_odd.close()

# Fetch data from a database
def fetch_data(database, query):
    conn = sqlite3.connect(database)
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    conn.close()
    return result

# Wrapper to simulate async behavior for fetch_data
async def fetch_data_async(database, query):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, fetch_data, database, query)

# Main function to fetch data concurrently
async def main():
    # Create and populate the databases
    create_and_populate_databases()

    # Queries for each database
    queries = [
        ("dbEVEN.db", "SELECT * FROM even"),
        ("dbODD.db", "SELECT * FROM odd"),
    ]

    # Create tasks to fetch data concurrently
    tasks = [fetch_data_async(database, query) for database, query in queries]
    results = await asyncio.gather(*tasks)

    # Display results
    for i, result in enumerate(results):
        print(f"Result {i + 1}: {result}")

# Run the asyncio event loop
asyncio.run(main())

The code above demonstrates fetching data concurrently using asyncio.

The expected result:

Result 1: [(2,), (4,), (6,), (8,), (10,)]
Result 2: [(1,), (3,), (5,), (7,), (9,)]

Simulating the importance of asyncio usage

We’ll create an exclusive lock on dbEVEN.db. This will prevent the first query from completing until the lock is released. Meanwhile, the second query to dbODD.db will execute as expected, demonstrating how asyncio handles concurrency even when one task is blocked.

Updated Code: Lock dbEVEN.db

import sqlite3
import asyncio
import threading

# Create and populate the databases
def create_and_populate_databases():
    # Create dbEVEN
    conn_even = sqlite3.connect("dbEVEN.db")
    cursor_even = conn_even.cursor()
    cursor_even.execute("CREATE TABLE IF NOT EXISTS even (number INTEGER)")
    cursor_even.executemany(
        "INSERT INTO even (number) VALUES (?)", [(2,), (4,), (6,), (8,), (10,)]
    )
    conn_even.commit()
    conn_even.close()

    # Create dbODD
    conn_odd = sqlite3.connect("dbODD.db")
    cursor_odd = conn_odd.cursor()
    cursor_odd.execute("CREATE TABLE IF NOT EXISTS odd (number INTEGER)")
    cursor_odd.executemany(
        "INSERT INTO odd (number) VALUES (?)", [(1,), (3,), (5,), (7,), (9,)]
    )
    conn_odd.commit()
    conn_odd.close()

# Lock dbEVEN.db to simulate blocking
def lock_database():
    conn = sqlite3.connect("dbEVEN.db")
    cursor = conn.cursor()
    cursor.execute("BEGIN EXCLUSIVE")  # Create an exclusive lock
    print("dbEVEN.db is locked.")
    try:
        threading.Event().wait(5)  # Hold the lock for 5 seconds
    finally:
        conn.rollback()  # Release the lock
        conn.close()
        print("dbEVEN.db lock released.")

# Fetch data from a database
def fetch_data(database, query):
    conn = sqlite3.connect(database)
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    conn.close()
    return result

# Wrapper to simulate async behavior for fetch_data
async def fetch_data_async(database, query):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, fetch_data, database, query)

# Main function to fetch data concurrently
async def main():
    # Create and populate the databases
    create_and_populate_databases()

    # Start the database lock in a separate thread
    lock_thread = threading.Thread(target=lock_database)
    lock_thread.start()

    # Queries for each database
    queries = [
        ("dbEVEN.db", "SELECT * FROM even"),
        ("dbODD.db", "SELECT * FROM odd"),
    ]

    # Create tasks to fetch data concurrently
    tasks = [fetch_data_async(database, query) for database, query in queries]

    # Process tasks as they finish
    for completed_task in asyncio.as_completed(tasks):
        try:
            result = await completed_task
            print(f"Result: {result}")
        except Exception as e:
            print(f"Error: {e}")

    # Wait for the lock thread to finish
    lock_thread.join()

# Run the asyncio event loop
asyncio.run(main())

Expected result:

dbEVEN.db is locked.
Result: [(1,), (3,), (5,), (7,), (9,)]
dbEVEN.db lock released.
Result: [(2,), (4,), (6,), (8,), (10,)]

asyncio vs. Multithreading and Multiprocessing

  • Multithreading: Threads share the same memory space, making them suitable for CPU-bound tasks. However, they can be tricky to manage due to race conditions and thread synchronization.
  • Multiprocessing: Processes have separate memory spaces and are better suited for CPU-bound tasks. They come with higher overhead due to inter-process communication.
  • asyncio: Focused on I/O-bound tasks, asyncio shines when dealing with high-latency operations like API calls or database queries. It avoids the complexities of threading while achieving concurrency.

Thank you for reading this article. I hope you found it helpful and informative. If you have any questions, or if you would like to suggest new Python code examples or topics for future tutorials, please feel free to reach out. Your feedback and suggestions are always welcome!

Happy coding!
C. C. Python Programming

You can also find this article at Medium.com

Leave a Reply