Skip to main content

Command Palette

Search for a command to run...

Concurrency in Python

Explaining Python’s concurrency models with clarity, context, and real understanding

Updated
18 min read
Concurrency in Python

Python’s concurrency tools let you optimize programs by running tasks in overlapping periods—great for cutting down wait times in I/O-heavy work and taking advantage of multiple CPU cores for compute-intensive tasks. In this article, I’m focusing on the most practical and widely used approaches: multithreading for I/O-bound workloads, multiprocessing for CPU-bound tasks, and asyncio for high-scale, single-threaded efficiency.

Concurrency in Python is a deep topic, and no single article can cover every nuance. There are advanced patterns, tricky edge cases, and evolving best practices you’ll discover over time. Treat this as a solid starting point. Then experiment, break things, debug them, and refine your understanding—hands-on work is what really makes these concepts click.

And please, read this sequentially from start to finish… not concurrently. Otherwise, you may end up with a race condition in your understanding. 😄

Let’s dive in!

What is concurreny?

When tasks defined in the code runs in a symultaneous or sometimes seemingly almost (pseudo) symultaneous way that results in efficient usage of time and compute resources is defined as tasks running concurrently.


Why is it required?

  1. I/O bound task

    Simply put, by default Python code runs sequentially. Whenever a blocking task is encountered, the execution waits until the current task completes before moving forward. In some scenarios, this wait period is long enough that system resources remain largely idle. We can take advantage of this window to execute other tasks and return once the original task is ready to continue.

  2. CPU bound task

    There are cases where multiple tasks require heavy CPU processing. With sequential execution, each task must finish before the next one starts. Using multiprocessing, these tasks can be distributed across different CPU cores and executed in parallel, reducing overall execution time.


Important Terminologies

Thread

It is a unit of execution within a process that runs a piece of code. By default, since we have a single thread, all the tasks are handled by the same thread sequentially.

For e.g., we have a 3 tasks:

  • Load customers dataset.

  • Load accounts dataset.

  • Load orders dataset.

Multi-threading allows us to create multiple threads to perform the tasks concurrently.

Note: This is not parallel computing or multiprocess. Exact difference will be discussed later in Threading section.

Multiprocess

Multiprocessing is the ability to execute tasks using multiple processes. Each process has its own memory space and can run on a separate CPU core, enabling true parallel execution.

CPython

  1. a bytecode interpreter

  2. written in C

  3. managing millions of tiny objects

  4. optimized for single-thread performance

  5. The core technical problem CPython faces:

    1. Every Python object has reference counts, mutable internal state, shared memory.

    2. So CPython must ensure:

      • reference counts stay correct

      • objects aren’t freed while still in use

      • memory isn’t corrupted

GIL

The Global Interpreter Lock (GIL) is a mechanism in CPython that allows only one thread to execute Python bytecode at a time, which limits true parallelism in multithreaded, CPU-bound programs.

The GIL ensures:

  • only one thread executes Python bytecode at a time

  • threads switch at well-defined points

  • memory state stays consistent

It does not:

  • prevent I/O parallelism

  • block native code from running

  • affect multiprocessing

Why is GIL required?

  1. Without a GIL, every single object operation would need locks.

  2. CPython designers chose one global lock instead of many tiny blocks.

  3. This dramatically simplifies:

    • memory management

    • garbage collection

    • C-extension APIs

    • interpreter correctness

  4. Removing the GIL means building a complete new Python interpreter:

    • rewriting memory management

    • redesigning object model

    • breaking C extensions

    • slowing down single-thread code

    • introducing subtle race bugs

*Other definitions

Refer to the following definitions when you encounter the terms while reading for better context:

  • Race Condition – A race condition happens when two or more threads or processes try to access and modify the same shared resource at the same time, and the final outcome depends on the order in which those operations happen. Since this order is unpredictable, the result becomes inconsistent, incorrect, and difficult to debug.

    Example
    Imagine you have a shared variable balance = 100.
    Two threads are trying to withdraw 50 at the same time.

    Both read the value as 100, both subtract 50, and both write back 50.
    Logically, the balance should be 0, but you end up with 50. That is a race condition.

    Why it happens

    • Shared resource

    • Multiple threads accessing it

    • No proper coordination or locking

Result

  • Random output

  • Rare bugs

  • Inconsistent behavior

  • Event Loop – The event loop is the core of asyncio. It continuously runs in a single thread, schedules tasks, and switches between coroutines whenever they pause on an awaited operation. Instead of waiting idly, the event loop keeps other tasks moving, which enables concurrency without using multiple threads.

  • Coroutines – Coroutines are special functions defined with async def that support asynchronous execution. Instead of blocking, they pause using await while waiting for I/O or other asynchronous work, allowing the event loop to run other coroutines in the meantime.

  • Non-Blocking Operation – A non-blocking operation is an operation that does not stop execution while waiting for a result. Instead of freezing the program, it immediately returns control and resumes later when the result is ready. In asyncio, most I/O operations (like network calls, file operations, timers, etc.) are non-blocking, allowing other tasks to run during the wait time.


Models for concurrency

Multithreading

  • Used primarily for I/O-bound tasks, where threads spend most of their time waiting on OS I/O, not executing Python bytecode.

  • In such scenarios, multiple threads are used.

  • Each thread is responsible of a particular task. Each thread executes a task, but tasks may share data and state.

  • The thread releases the GIL only when it enters a blocking I/O operation implemented in C that explicitly releases the GIL.

  • Another runnable thread may acquire the GIL and continue executing Python bytecode.

  • Thus, while some threads are blocked on I/O, other threads can make progress, reducing idle CPU time.

  • All threads share:

    • variables

    • memory

    • interpreter

    • GIL

  • This sharing is why data corruption through *race conditions are possible.

  • Threads can manipulate shared global variables, which requires explicit synchronization to avoid race conditions.

Demo

Using Anilist API to demonstrate the time consumption comparison between syncronous and asynchronous approach to get output from 15 requests.

AnimeAPI:

import requests

class AnimeAPI:

    def __init__(self):
        self.query = '''
query ($id: Int) { # Define which variables will be used in the query (id)
  Media (id: $id, type: ANIME) { # Insert our variables into the query arguments (id) (type: ANIME is hard-coded in the query)
    id
    title {
      romaji
      english
      native
    }
  }
}
'''
        self.url = 'https://graphql.anilist.co'

    def response(self, anime_id:int):
        variables = {
            'id': anime_id
        }
        response = requests.post(self.url, json={'query': self.query, 'variables': variables})
        return response.json()

Multithreading execution:

from api import AnimeAPI
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, as_completed

anime = AnimeAPI()

anime_ids = [100,200,300,400, 700, 628, 524, 377, 826, 451, 280, 395, 399, 124, 626]


# Sync code #############################################################################

anime_dic = {}

sync_start = perf_counter()

for anime_id in anime_ids:
    try:
        anime_dic[anime_id] = anime.response(anime_id)
    except Exception as e:
        anime_dic[anime_id] = e

sync_end = perf_counter()

#####################################################################################

# Async code ############################################################################

anime_dic_async = {}

async_start = perf_counter()

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(anime.response, i): i for i in anime_ids}

    for future in as_completed(futures):

        anime = futures[future]

        try:
            data = future.result()
        except Exception as e:
            data = e

        anime_dic_async[anime] = data

async_end = perf_counter()

#####################################################################################

sync_time = sync_end - sync_start
async_time = async_end - async_start

print(f'''
Number of animes requested synchronously: {len(anime_ids)}
Number of outputs received synchronously: {len(anime_dic)}
Time taken for synchronous execution: {sync_time}

##############################################################

Number of animes requested asynchronously: {len(anime_ids)}
Number of outputs received asynchronously: {len(anime_dic_async)}
Time taken for asynchronous execution: {async_time}
''')

Output:

Number of animes requested synchronously: 15
Number of outputs received synchronously: 15
Time taken for synchronous execution: 10.28090550005436

##############################################################

Number of animes requested asynchronously: 15
Number of outputs received asynchronously: 15
Time taken for asynchronous execution: 3.0076786999125034
💡
Note the significant reduction in the execution time in concurrency.

Race condition demo:

from time import sleep
from threading import Thread

balance = 1000

def withdraw(amount: float):
    global balance
    temp = balance
    sleep(0.001)
    balance = temp - amount

threads = [Thread(target=withdraw, args=[50]) for _ in range(4)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(f"Expected: 800, Actual: {balance}")

Output:

Expected: 800, Actual: 950

Solving race condition using Lock:

from time import sleep
from threading import Thread, Lock, current_thread

balance = 1000

lock = Lock()

def withdraw(amount: float):
    global balance
    with lock:
        temp = balance
        print(f"Thread {current_thread()}: Reading balance = {temp}")
        sleep(0.001)
        balance = temp - amount
        print(f"Thread {current_thread}: New balance = {balance}")

threads = [Thread(target=withdraw, args=[50]) for _ in range(4)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(f"Expected: 800, Actual: {balance}")

Output:

Thread <Thread(Thread-1 (withdraw), started 27604)>: Reading balance = 1000
Thread <function current_thread at 0x000001758BD21BC0>: New balance = 950
Thread <Thread(Thread-2 (withdraw), started 29580)>: Reading balance = 950
Thread <function current_thread at 0x000001758BD21BC0>: New balance = 900
Thread <Thread(Thread-3 (withdraw), started 29852)>: Reading balance = 900
Thread <function current_thread at 0x000001758BD21BC0>: New balance = 850
Thread <Thread(Thread-4 (withdraw), started 7656)>: Reading balance = 850
Thread <function current_thread at 0x000001758BD21BC0>: New balance = 800
Expected: 800, Actual: 800

Multiprocessing

  • Each process has its own threads and its own GIL, but processes do not share a GIL with each other.

  • When there are multiple CPU-bound tasks, they are distributed among different cores of the CPU, each handling the task in an isolated environment. Thus, multiple processes run on multiple cores.

  • The OS scheduler maps processes to cores, enabling true parallel execution of CPU-bound work.

  • Each process has their own set/copy of:

    • variables

    • memory

    • interpreter

    • GIL

  • Memory is not shared by default, and therefore is shared using serialization (pickling) costs.

  • Processes cannot directly manipulate the same global variable because they do not share memory space. Shared memory can be explicitly created, but it is not the default and must be managed carefully.

  • For orchestrating processes and building pipeline for data transfer, Queue is used. It is used to build multiprocessing pipelines. It lets producers push data and consumers retrieve it safely, without conflicts.

Demo

import os
from time import perf_counter
from concurrent.futures import ProcessPoolExecutor, as_completed


def calculate_factorial(start: int, end: int):
    result = 1
    for i in range(start, end + 1):
        result *= i
    # Adding extra computation to make it CPU-heavy
    for _ in range(2000000):
        result = (result % 1000000007) * 2
    return result


if __name__ == "__main__":
    processes = [(2,38), (5, 50), (68, 83), (14, 57),
                 (3,38), (6, 50), (69, 83), (15, 57),
                 (4,38), (7, 50), (70, 83), (16, 57),
                 (5,38), (8, 50), (25, 83), (17, 57),
                 (6,38), (9, 50), (26, 83), (18, 57)]
    cpu_count = os.cpu_count()
    cpu2use = int(0.5 * cpu_count)

    # Sync code #############################################################################

    results = {}

    sync_start = perf_counter()

    for process in processes:
        try:
            results[process] = calculate_factorial(*process)
        except Exception as e:
            results[process] = e

    sync_end = perf_counter()

    #####################################################################################

    # Async code ############################################################################

    results_async = {}

    async_start = perf_counter()

    with ProcessPoolExecutor(max_workers = cpu2use) as executor:
        futures = {executor.submit(calculate_factorial, *i): i for i in processes}

        for future in as_completed(futures):

            process_id = futures[future]

            try:
                data = future.result()
            except Exception as e:
                data = e

            results_async[process_id] = data

    async_end = perf_counter()

    #####################################################################################

    sync_time = sync_end - sync_start
    async_time = async_end - async_start

    print(f'''
Number of processes processed synchronously: {len(processes)}
Number of outputs received synchronously: {len(results)}
Time taken for synchronous execution: {sync_time}

##############################################################

Number of processes processed asynchronously: {len(processes)}
Number of outputs received asynchronously: {len(results_async)}
Time taken for asynchronous execution: {async_time}
''')

Output:

Number of processes processed synchronously: 20
Number of outputs received synchronously: 20
Time taken for synchronous execution: 3.7746968001592904

##############################################################

Number of processes processed asynchronously: 20
Number of outputs received asynchronously: 20
Time taken for asynchronous execution: 1.1230935999192297
💡
Note the significant reduction in the execution time in concurrency.

Queue demo

import os
import json
from multiprocessing import Process, Queue


def save_data_to_json(data:list):
    try:
        with open('data.json', 'r') as file:
            loaded_data = json.load(file)
            loaded_data += data
    except:
        loaded_data = data
    with open('data.json', 'w', encoding='utf-8') as json_file:
        json.dump(loaded_data, json_file, indent=4)
    print('Data saved.')


def producer(queue1:Queue, items:list):
    for item in items:
        print(f'Producing {item}')
        queue1.put(item)
    queue1.put(None)
    print('Producer done')


def transformer(queue1:Queue, queue2:Queue, factor:float):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        transformed = factor * item
        print(f'Transforming {item} -> {transformed}')
        queue2.put(transformed)
    print('Transformer done')


def save(queue2:Queue):
    data = []
    while True:
        item = queue2.get()
        if item is None:
            break
        print(f'Getting {item}')
        data.append(item)
    save_data_to_json(data)


if __name__ == '__main__':
    os.remove('data.json')

    data = [2,4,6,3,5,7,9]

    queue1 = Queue()
    queue2 = Queue()

    processes = []

    processes.append(Process(target=producer, args=(queue1, data)))
    processes.append(Process(target=transformer, args=(queue1, queue2, 0.2)))
    processes.append(Process(target=save, args=(queue2,)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    with open('data.json', 'r') as file:
        saved_data = json.load(file)

    print(saved_data)

Output:

Producing 2
Producing 4
Producing 6
Producing 3
Producing 5
Producing 7
Producing 9
Producer done
Transforming 2 -> 0.4
Transforming 4 -> 0.8
Transforming 6 -> 1.2000000000000002
Transforming 3 -> 0.6000000000000001
Transforming 5 -> 1.0
Transforming 7 -> 1.4000000000000001
Transforming 9 -> 1.8
Transformer done
Getting 0.4
Getting 0.8
Getting 1.2000000000000002
Getting 0.6000000000000001
Getting 1.0
Getting 1.4000000000000001
Getting 1.8
Data saved.
[0.4, 0.8, 1.2000000000000002, 0.6000000000000001, 1.0, 1.4000000000000001, 1.8]

Asyncio

  • Asyncio does not use multiprocessing. It also does not rely on multithreading by default, although it can use threads in specific situations, which we will discuss later.

  • Instead, asyncio achieves concurrency using an *event loop. The event loop runs multiple *coroutines together by scheduling them and switching between them whenever a coroutine performs a *non-blocking operation and awaits it, instead of blocking execution.

  • In simple terms, coroutines cooperatively yield control, allowing other coroutines to run during I/O waits, which reduces idle time.

  • The asyncio library provides the event loop, coroutines, tasks and futures that work together with the async and await syntax built into Python.

  • When we need to run a blocking operation inside an asyncio program, asyncio provides a way to execute it in a separate thread so the event loop doesn’t get blocked. Internally, asyncio submits these tasks to a ThreadPoolExecutor (similar to what we saw earlier in the multithreading example).

  • aiohttp is commonly used alongside asyncio as an asynchronous HTTP client and server framework. It is ideal for building RESTful APIs, handling a large number of concurrent network connections, and performing tasks like web scraping without blocking the event loop.

Demo

Asyncio Anime API:

import aiohttp
import asyncio


class AsyncAnimeAPI:

    def __init__(self):
        self.query = '''
query ($id: Int) {
  Media (id: $id, type: ANIME) {
    id
    title {
      romaji
      english
      native
    }
  }
}
'''
        self.url = 'https://graphql.anilist.co'

    async def response(self, anime_id: int):
        """Async method to fetch anime data"""
        variables = {
            'id': anime_id
        }

        try:

          async with aiohttp.ClientSession() as session:
              async with session.post(
                  self.url, 
                  json={'query': self.query, 'variables': variables}
              ) as response:

                  return await response.json()

        except asyncio.TimeoutError:
            return {'error': 'Timeout', 'anime_id': anime_id}
        except Exception as e:
            return {'error': str(e), 'anime_id': anime_id}

timedec decorator to calculate execution time:

from functools import wraps
import time


def timedec(operation_name:str):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time.perf_counter()
            result = func(*args, **kwargs)
            end = time.perf_counter()
            print(f"Total time taken for {operation_name}: {end - start}.")
            return result
        return wrapper
    return decorator

Asyncio implementation:

import asyncio
from api import AnimeAPI
from asyncio_api import AsyncAnimeAPI
from timedecorator import timedec


@timedec('synchronous execution')
def sync_execution(anime_ids:list):

    anime_api = AnimeAPI()

    for anime_id in anime_ids:
        try:
            _ = anime_api.response(anime_id)
        except Exception as e:
            _ = e


@timedec('asynchronous execution')
async def async_execution(anime_ids:list):

    anime_api = AsyncAnimeAPI()

    tasks = [anime_api.response(idx) for idx in anime_ids]
    _ = await asyncio.gather(*tasks)


if __name__ == "__main__":
    anime_ids = [100,200,300,400, 700, 628, 524, 377, 826, 451, 280, 395, 399, 124, 626]
    sync_execution(anime_ids)
    asyncio.run(async_execution(anime_ids))

Output:

Total time taken for synchronous execution: 7.806094500003383.
Total time taken for asynchronous execution: 3.0999071896076202e-06.

When working with asyncio, blocking functions can freeze the event loop.
asyncio.to_thread() lets you run those blocking operations in a separate thread, so your async program remains responsive.
Demo:

from pprint import pprint
import asyncio
from api import AnimeAPI #Refer to this API code in 'Multithreading' section


async def get_response(api:AnimeAPI, anime_id:int):
    return await asyncio.to_thread(api.response, anime_id)


async def main():

    taskgroup_anime_ids = [100,200,300,400,600]

    anime_api = AnimeAPI()

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(get_response(anime_api, idx)) for idx in taskgroup_anime_ids]

    results = [task.result() for task in tasks]

    pprint(results)


if __name__ == "__main__":
    asyncio.run(main())
[{'data': {'Media': {'id': 100,
                     'title': {'english': 'Prétear: The New Legend of Snow '
                                          'White',
                               'native': '新白雪姫伝説プリーティア',
                               'romaji': 'Shin Shirayuki-hime Densetsu '
                                         'Pretear'}}}},
 {'data': {'Media': {'id': 200,
                     'title': {'english': None,
                               'native': '天使な小生意気',
                               'romaji': 'Tenshi na Konamaiki'}}}},
 {'data': {'Media': {'id': 300,
                     'title': {'english': '3x3 Eyes',
                               'native': '3×3EYES',
                               'romaji': '3x3 EYES'}}}},
 {'data': {'Media': {'id': 400,
                     'title': {'english': 'Outlaw Star',
                               'native': '星方武侠アウトロースター',
                               'romaji': 'Seihou Bukyou Outlaw Star'}}}},
 {'data': {'Media': {'id': 600,
                     'title': {'english': None,
                               'native': 'レジェンドオブ・デュオ',
                               'romaji': 'Legend of Duo'}}}}]

TaskGroup:
asyncio.TaskGroup provides structured concurrency in Python. Instead of manually creating and managing tasks, a TaskGroup groups related asynchronous operations together and guarantees that they are all tracked, awaited, and cleaned up safely.
When you create tasks inside a TaskGroup, all of them run concurrently, and the block does not exit until every task completes. If any task raises an exception, the TaskGroup automatically cancels the remaining tasks and propagates the error in a predictable way. This prevents “orphan” background tasks, missing results, and silent failures that commonly occur when managing tasks manually with create_task().
In simple terms, TaskGroup makes asynchronous code safer and more reliable by enforcing lifecycle management for tasks, so you do not have to do it yourself.

gather vs TaskGroup:
asyncio.gather() runs multiple coroutines and waits for results, but error handling can be messy and tasks may survive in weird states.
TaskGroup provides structured concurrency: tasks belong to a group, errors are handled predictably, and no task is accidentally left running in the background.

from pprint import pprint
import asyncio
from asyncio_api import AsyncAnimeAPI


async def main():

    taskgroup_anime_ids = [100,200,300,400]
    independent_anime_ids = [600,853]

    anime_api = AsyncAnimeAPI()

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(anime_api.response(idx)) for idx in taskgroup_anime_ids]

    results = [task.result() for task in tasks]

    tasks2 = [asyncio.create_task(anime_api.response(idx)) for idx in independent_anime_ids]

    results.extend(await asyncio.gather(*tasks2))

    pprint(results)


if __name__ == "__main__":
    asyncio.run(main())
[{'data': {'Media': {'id': 100,
                     'title': {'english': 'Prétear: The New Legend of Snow '
                                          'White',
                               'native': '新白雪姫伝説プリーティア',
                               'romaji': 'Shin Shirayuki-hime Densetsu '
                                         'Pretear'}}}},
 {'data': {'Media': {'id': 200,
                     'title': {'english': None,
                               'native': '天使な小生意気',
                               'romaji': 'Tenshi na Konamaiki'}}}},
 {'data': {'Media': {'id': 300,
                     'title': {'english': '3x3 Eyes',
                               'native': '3×3EYES',
                               'romaji': '3x3 EYES'}}}},
 {'data': {'Media': {'id': 400,
                     'title': {'english': 'Outlaw Star',
                               'native': '星方武侠アウトロースター',
                               'romaji': 'Seihou Bukyou Outlaw Star'}}}},
 {'data': {'Media': {'id': 600,
                     'title': {'english': None,
                               'native': 'レジェンドオブ・デュオ',
                               'romaji': 'Legend of Duo'}}}},
 {'data': {'Media': {'id': 853,
                     'title': {'english': 'Ouran High School Host Club',
                               'native': '桜蘭高校ホスト部',
                               'romaji': 'Ouran Koukou Host Club'}}}}]

Some more useful concepts in asyncio you can refer to:

Semaphore
A semaphore is used to limit how many coroutines can run a specific piece of code at the same time. This is useful when hitting APIs with rate limits, restricting database connections, or controlling access to limited resources.
You acquire it before running a task and release it when done. If the limit is reached, other coroutines wait.
In short: Semaphore = controlled concurrency instead of unlimited concurrency.

AnimeAPI integrated with Semaphore:

import asyncio
from asyncio_api import AsyncAnimeAPI


class SemaphoreDemo(AsyncAnimeAPI):

    def __init__(self, sem: asyncio.Semaphore):
        super().__init__()
        self.sem = sem

    async def worker(self, anime_id: int):
        print(f"Task {anime_id}: Waiting for semaphore...")

        async with self.sem:
            print(f"Task {anime_id}: Acquired semaphore, starting request")
            result = await self.response(anime_id)
            print(f"Task {anime_id}: Request completed")

        print(f"Task {anime_id}: Released semaphore")
        return result

Implementing Semaphore with value 2:
This will allow only 2 operations to run concurrently at a time.

import asyncio
from sem import SemaphoreDemo


async def main():
    semaphore = asyncio.Semaphore(2)
    demo = SemaphoreDemo(semaphore)

    anime_ids = [100,200,300,400]

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(demo.worker(idx)) for idx in anime_ids]

    _ = [task.result() for task in tasks]


if __name__ == '__main__':
    asyncio.run(main())

Output:

Task 100: Waiting for semaphore...
Task 100: Acquired semaphore, starting request
Task 200: Waiting for semaphore...
Task 200: Acquired semaphore, starting request
Task 300: Waiting for semaphore...
Task 400: Waiting for semaphore...
Task 200: Request completed
Task 200: Released semaphore
Task 300: Acquired semaphore, starting request
Task 100: Request completed
Task 100: Released semaphore
Task 400: Acquired semaphore, starting request
Task 300: Request completed
Task 300: Released semaphore
Task 400: Request completed
Task 400: Released semaphore
💡
The above output shows that how Semaphore ensures only 2 operations at a time as configured.

Lock
A lock ensures that only one coroutine accesses a shared resource at a time. This prevents race conditions when multiple coroutines try to modify shared state.

Event
An event is a signaling mechanism. One coroutine can set an event, and others waiting on it will resume. Useful for coordination between tasks.

Queue
asyncio.Queue is designed for asynchronous producer–consumer pipelines. Producers put items in the queue, consumers await them. It provides built-in backpressure and prevents uncontrolled task growth.

Cancellation & Timeouts
Asyncio supports cooperative cancellation. Tasks should be written to handle cancellation cleanly. asyncio.wait_for() or timeouts on APIs ensure long-running tasks don’t freeze the system.

Backpressure and Flow Control
Asyncio doesn’t magically solve overload problems. Use queues, semaphores, and proper design to avoid overwhelming external systems or your own application.


Comparison table for the concurrency models

AspectThreadingMultiprocessingAsyncio
Best ForI/O-bound tasks using blocking librariesCPU-bound tasks needing true parallelismMassive I/O concurrency using async APIs
Execution ModelMultiple OS threads in one processMultiple independent processesSingle thread, event loop, cooperative multitasking
GIL ImpactAffected by GIL → only one thread executes Python bytecode at a timeNot affected (each process has its own interpreter & GIL)Not affected in typical usage; only one coroutine runs at a time by design
Parallel CPU Execution❌ No real parallelism for Python code✅ True parallelism across CPU cores❌ No (unless explicitly offloading work)
I/O HandlingWorks well because many I/O ops release GIL internallyWorks but heavyweight for I/OBest for non-blocking I/O (await)
ScalabilityModerate (dozens to hundreds of threads)Good but expensive (RAM + process startup)Excellent (thousands of concurrent tasks)
Memory UsageLow to moderateHigh (separate memory per process)Very low
ComplexitySimple mental modelModerate; requires IPC and data serializationHigher learning curve; requires async-aware ecosystem
Good Use CasesFile I/O, blocking network calls, background workers, GUI appsData processing, ML workloads, CPU crunching, parallel computationAPIs, web servers, scraping at scale, chat apps, streaming
Bad Use CasesCPU-bound workloadsExtremely lightweight tasks that don’t justify process overheadCPU-bound work or blocking libraries
Typical Toolsthreading, ThreadPoolExecutormultiprocessing, ProcessPoolExecutorasyncio, aiohttp, async DB drivers
Failure Mode if MisusedThread explosion, overheadHigh overhead, complex debugging, shared state painEvent loop freezes if blocking code sneaks in

Quick rule of thumb choose concurrency model for my task

  • I/O + blocking libs + manageable concurrency → Threading

  • CPU-bound, need real parallel speedup → Multiprocessing

  • Huge I/O concurrency + async ecosystem available → Asyncio