asyncio及其接口

asyncio及其接口

2025年5月2日
python
202505, asyncio

协程 #

import asyncio


async def one():
    return 1


async def greet():
    await asyncio.sleep(2)
    return 'Hello world'


async def main():
    res1 = await one()
    res2 = await greet()

    print(res1)
    print(res2)

asyncio.run(main())

Task #

Task的cancelled接口 #

import asyncio


async def c():
    return 1


async def main():
    task = asyncio.create_task(c())

    await task
    print(task.cancelled())


asyncio.run(main())

create_task #

create_task() 方法用于创建一个新的 Task 对象,并将其添加到事件循环中。不会直接启动任务,直到遇到一个await

import asyncio


async def one():
    return 1


async def greet(timeout):
    print('run', timeout)
    await asyncio.sleep(timeout)
    return f'Hello world {timeout}'


async def main():
    res1 = asyncio.create_task(one())
    res2 = asyncio.create_task(greet(2))
    res3 = asyncio.create_task(greet(3))
    res4 = asyncio.create_task(greet(4))
    res5 = asyncio.create_task(greet(2))
    res6 = asyncio.create_task(greet(3))

    print('sleep 3')
    await asyncio.sleep(3)
    print('sleep end')

    print(await res1)
    print(await res2)
    print(await res3)
    print(await res4)
    print(await res5)
    print(await res6)


asyncio.run(main())

# sleep 3
# run 2
# run 3
# run 4
# run 2
# run 3
# sleep end
# 1
# Hello world 2
# Hello world 3
# Hello world 4
# Hello world 2
# Hello world 

取消任务 #

主动取消捕获异常 #

import asyncio


async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'


async def main():
    long_task = asyncio.create_task(greet(6))

    seconds = 0

    while not long_task.done():
        await asyncio.sleep(1)
        seconds += 1

        if seconds == 5:
            long_task.cancel()

        print('Time passed:', seconds)

    try:
        result = await long_task
        print(result)
    except asyncio.CancelledError:
        print('The long task cancelled')


asyncio.run(main())

设置超时时间并取消任务 #

import asyncio


async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'


async def main():
    long_task = asyncio.create_task(greet(6))

    try:
        result = await asyncio.wait_for(
            long_task,
            timeout=3
        )
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('The long task took longer than 3 seconds')
        try:
            result = await long_task
            print(result)
        except asyncio.exceptions.CancelledError:
            print('The long task cancelled')


asyncio.run(main())

设置超时时间但不取消任务 #

import asyncio


async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'


async def main():
    long_task = asyncio.create_task(greet(6))

    try:
        result = await asyncio.wait_for(
            asyncio.shield(long_task),
            timeout=3
        )
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('The long task took longer than 3 seconds')
        result = await long_task
        print(result)


asyncio.run(main())

asyncio.run #

coroutine未执行结束 #

import asyncio


async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)


async def main():
    print('-- main beginning')
    asyncio.create_task(coro('text'))  # 没有await
    await asyncio.sleep(0.5)
    print('-- main done')


asyncio.run(main())

# -- main beginning
# text
# -- main done

coroutine执行结束 #

import asyncio


async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)


async def main():
    print('-- main beginning')
    asyncio.create_task(coro('text'))  # 没有await
    await asyncio.sleep(1)
    print('-- main done')


asyncio.run(main())

# -- main beginning
# text
# -- main done
# text

查看所有任务 #

import asyncio


async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)


async def main():
    print(asyncio.all_tasks())

    print('-- main beginning')
    asyncio.create_task(coro('text'))
    print(asyncio.all_tasks())

    await asyncio.sleep(0.5)
    print('-- main done')


asyncio.run(main())

# {<Task pending name='Task-1' coro=<main() running at 2.py:11> cb=[_run_until_complete_cb() at base_events.py:184]>}
# -- main beginning
# {<Task pending name='Task-2' coro=<coro() running at 2.py:4>>, <Task pending name='Task-1' coro=<main() running at 2.py:15> cb=[_run_until_complete_cb() at base_events.py:184]>}
# text
# -- main done

ContextManager #

import asyncio
import aiohttp


class WriteToFile:
    def __init__(self, filename):
        self.filename = filename

    def __enter__(self):
        self.file_object = open(self.filename, 'w')
        return self.file_object

    def __exit__(self, exc_type, exc_value, traceback):
        if self.file_object:
            self.file_object.close()



class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()


async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        print(f'{url}: {html[:20]}')


async def main():
    await asyncio.create_task(check('https://facebook.com'))
    await asyncio.create_task(check('https://youtube.com'))
    await asyncio.create_task(check('https://google.com'))


asyncio.run(main())

group task by gather #

gather 捕获异常的任务, 等待所有任务执行结束 #

import asyncio
import aiohttp


class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()


class ServerError(Exception):
    def __init__(self, message):
        self._message = message

    def __str__(self):
        return self._message


async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')


async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'


async def main():
    coros = [
        check('https://www.baidu.com'),
        check('https://www.douyin.com'),
        check('https://www.bilibili.com')
    ]

    results = await asyncio.gather(
        *coros,
        server_returns_error(),
        return_exceptions=True
    )

    for res in results:
        print(res)


asyncio.run(main())

gather 只有部分任务结束 #

import asyncio
import aiohttp


class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()


class ServerError(Exception):
    def __init__(self, message):
        self._message = message

    def __str__(self):
        return self._message


async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')


async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'


async def main():
    coros = [
        check('https://www.baidu.com'),
        check('https://www.douyin.com'),
        check('https://www.bilibili.com'),
    ]

    for coro in asyncio.as_completed(coros):
        result = await coro
        print(result)

    await asyncio.sleep(1)  # windows need to avoid loop close exception

asyncio.run(main())

gather gather #

import asyncio
import aiohttp


class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()


class ServerError(Exception):
    def __init__(self, message):
        self._message = message

    def __str__(self):
        return self._message


async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')


async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'


async def main():
    group1 = asyncio.gather(
        check('https://www.baidu.com'),
        check('https://www.sina.com'),
    )

    group2 = asyncio.gather(
        check('https://www.douban.com'),
        check('https://www.bilibili.com')
    )
    print(type(group2))

    groups = asyncio.gather(group1, group2)

    results = await groups

    for res in results:
        print(res)

    await asyncio.sleep(1)

asyncio.run(main())

non-obvious issue #

有异步 #

import asyncio


async def nothing():
    print('Busy')


async def busy_loop():
    for i in range(10):  # while True: 会导致normal任务无法执行
        await nothing()


async def normal():
    for i in range(10):
        print('Normal coroutine')


async def main():
    await asyncio.gather(
        busy_loop(),
        normal()
    )


asyncio.run(main())

有异步 #

import asyncio


async def nothing():
    print('Busy')
    await asyncio.sleep(0)


async def busy_loop():
    for i in range(10):
        await nothing()


async def normal():
    for i in range(10):
        print('Normal coroutine')
        await asyncio.sleep(0)


async def main():
    t1 = asyncio.create_task(busy_loop())
    t2 = asyncio.create_task(normal())

    await t1
    await t2



asyncio.run(main())

没有异步 #

import asyncio


async def nothing():
    print('Busy')
    await asyncio.sleep(0)


async def busy_loop():
    for i in range(10):
        await nothing()


async def normal():
    for i in range(10):
        print('Normal coroutine')
        await asyncio.sleep(0)


async def main():
    await asyncio.create_task(busy_loop())
    await asyncio.create_task(normal())

asyncio.run(main())

TaskGroup #

需要3.11以上版本

import asyncio
import aiohttp


class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()


async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'


async def main():
    async with asyncio.TaskGroup() as tg:
        print(type(tg))
        print(dir(tg))
        print()

        task1 = tg.create_task(check('https://www.baidu.com'))
        task2 = tg.create_task(check('https://www.sina.com'))
        task3 = tg.create_task(check('https://www.bilibili.com'))

    # 不能移动到async with语句内, task还未结束
    print(task1.result())
    print(task2.result())
    print(task3.result())


asyncio.run(main())

gather with exception #

只能捕获第一个异常 #

import asyncio


async def coro_norm():
    return 'Hello world'


async def coro_value_error():
    raise ValueError


async def coro_type_error():
    raise TypeError


async def main():
    try:
        results = await asyncio.gather(
            coro_norm(),
            coro_value_error(),
            coro_type_error(),
        )

    except ValueError as e:
        print(f'{e=}')

    except TypeError as e:
        print(f'{e=}')

    else:
        print(f'{results=}')


asyncio.run(main())

不触发异常 #

import asyncio


async def coro_norm():
    return 'Hello world'


async def coro_value_error():
    raise ValueError


async def coro_type_error():
    raise TypeError


async def main():
    try:
        results = await asyncio.gather(
            coro_norm(),
            coro_value_error(),
            coro_type_error(),
            return_exceptions=True
        )

    except ValueError as e:
        print(f'{e=}')

    except TypeError as e:
        print(f'{e=}')

    else:
        print(f'{results=}')


asyncio.run(main())

TaskGroup with exception #

捕获所有异常 #

exception* 捕获异步异常

import asyncio


async def coro_norm():
    return 'Hello world'


async def coro_value_error():
    raise ValueError


async def coro_type_error():
    raise TypeError


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_norm())
            task2 = tg.create_task(coro_value_error())
            task3 = tg.create_task(coro_type_error())

        results = [task1.result(), task2.result(), task3.result()]

    except* ValueError as e:
        print(f'{e=}')

    except* TypeError as e:
        print(f'{e=}')

    else:
        print(f'{results=}')


asyncio.run(main())

组任务cancle #

gather cancel #

import asyncio


async def coro_norm():
    print('coro_norm')
    return 'Hello world'


async def coro_value_error():
    raise ValueError


async def coro_long():
    try:
        print('Long task is running...')
        await asyncio.sleep(3)
        return 'Long task result'

    except asyncio.CancelledError:
        print('All needed actions are done')
        raise asyncio.CancelledError


async def main():
    task1 = asyncio.create_task(coro_norm())
    task2 = asyncio.create_task(coro_value_error())
    task3 = asyncio.create_task(coro_long(), name='coro_long')

    tasks = [task1, task2, task3]

    try:
        results = await asyncio.gather(
            *tasks
        )
    except ValueError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')

    for task in tasks:
        if task.done() is False:
            task.cancel()
            print(f'Pending: {task.get_name()}')

    await asyncio.sleep(2)
    print()
    print(f'{task1._state}')
    print(f'{task2._state}')
    print(f'{task3._state}')

asyncio.run(main())

gather cancel with exception #

出错的任务会导致之后的任务取消

import asyncio


async def coro_norm():
    return 'Hello world'


async def coro_value_error():
    raise ValueError


async def coro_long():
    try:
        print('Long task is running...')
        await asyncio.sleep(2)
        return 'Long task result'

    except asyncio.CancelledError:
        print('All needed actions are done')
        raise asyncio.CancelledError


async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_norm())
            task2 = tg.create_task(coro_value_error())
            task3 = tg.create_task(coro_long())

        results = [task1.result(), task2.result(), task3.result()]
    except* ValueError as e:
        print(f'{e=}')
    except* TypeError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')

asyncio.run(main())

# Long task is running...
# All needed actions are done
# e=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()])

async for #

import asyncio

from redis import asyncio as aioredis


class A:
    def __iter__(self):
        self.x = 0
        return self

    def __next__(self):
        if self.x > 2:
            raise StopIteration

        else:
            self.x += 1
            return self.x


class RedisReader:
    def __init__(self, redis, keys):
        self.redis = redis
        self.keys = keys

    def __aiter__(self):
        self.ikeys = iter(self.keys)
        return self

    async def __anext__(self):
        try:
            key = next(self.ikeys)

        except StopIteration:
            raise StopAsyncIteration

        async with self.redis.client() as connection:
            value = await connection.get(key)

        return value


async def main():
    redis = await aioredis.from_url('redis://localhost')

    keys = ['morston', 'morgan', 'vader', 'lennon']

    async for name in RedisReader(redis, keys):
        print(name)


asyncio.run(main())

async comprehension #

import asyncio

from faker import Faker


faker = Faker('en_US')


async def get_user(n=1):
    for i in range(n):
        await asyncio.sleep(0.1)
        name, surname = faker.name_male().split()
        yield name, surname


async def main():
    users_list = [name async for name in get_user(3)]
    # print(users_list)

    users_dict = {name: surname async for name, surname in get_user(3)}
    # print(users_dict)

    users_set = {name async for name in get_user(3)}
    print(users_set)


asyncio.run(main())

async context manager #

import asyncio
from contextlib import contextmanager, asynccontextmanager

from redis import asyncio as aioredis


@contextmanager
def custom_open(filename, mode='w'):
    file_obj = open(filename, mode)
    yield file_obj
    file_obj.close()


@asynccontextmanager
async def redis_connection():
    try:
        redis = await aioredis.from_url('redis://localhost')
        yield redis

    finally:
        await redis.aclose()


async def main():
    # with custom_open('file.txt') as f:
    #     f.write('Hello world')

    async with redis_connection() as redis:
        await redis.set('my_key', 'asyncio course')


asyncio.run(main())

async Queue #

import asyncio
from random import randint


class C:
    norm = '\033[0m'
    blue = '\033[94m'
    green = '\033[92m'

c = C()


async def producer(queue, name):
    timeout = randint(1, 5)
    await queue.put(timeout)
    print(f'{c.blue}Producer {name} put {timeout} to the queue: {queue}{c.norm}')


async def consumer(queue, name):
    while True:
        timeout = await queue.get()
        await asyncio.sleep(timeout)
        print(f'{c.green}Consumer {name} ate {timeout}, queue: {queue}{c.norm}')
        queue.task_done()


async def main():
    queue = asyncio.Queue(maxsize=3)

    producers = []
    for i in range(12):
        task = asyncio.create_task(producer(queue, name=i))
        producers.append(task)

    consumers = []
    for i in range(4):
        task = asyncio.create_task(consumer(queue, name=i))
        consumers.append(task)

    await asyncio.gather(*producers)
    await queue.join()

    for c in consumers:
        c.cancel()


asyncio.run(main())

asyncio Queue example #

import asyncio
from concurrent.futures import ProcessPoolExecutor

import aiohttp
import aiofiles
from bs4 import BeautifulSoup


async def make_request(url, session):
    response = await session.get(url)

    if response.ok:
        return response
    else:
        print(f'{url} returned: {response.status}')


async def get_image_page(queue, session):
    url = 'https://c.xkcd.com/random/comic/'
    response = await make_request(url, session)

    await queue.put(response.url)


def _parse_link(html):
    soup = BeautifulSoup(html, 'lxml')
    image_link = 'https:' + soup.select_one('div#comic>img').get('src')
    return image_link


async def get_image_url(pages_queue, image_urls_queue, session):
    while True:
        url = await pages_queue.get()
        response = await make_request(url, session)
        html = await response.text()

        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            image_link = await loop.run_in_executor(
                pool,
                _parse_link,
                html
            )

        await image_urls_queue.put(image_link)

        pages_queue.task_done()


async def download_image(image_urls_queue, session):
    while True:
        url = await image_urls_queue.get()
        response = await make_request(url, session)

        filename = url.split('/')[-1]
        async with aiofiles.open(filename, 'wb') as f:
            async for chunk in response.content.iter_chunked(1024):
                await f.write(chunk)

        image_urls_queue.task_done()



async def main():
    session = aiohttp.ClientSession()

    pages_queue = asyncio.Queue()
    image_urls_queue = asyncio.Queue()

    page_getters = []
    for i in range(4):
        task = asyncio.create_task(
            get_image_page(pages_queue, session)
        )
        page_getters.append(task)

    url_getters = []
    for i in range(4):
        task = asyncio.create_task(
            get_image_url(pages_queue, image_urls_queue, session)
        )
        url_getters.append(task)

    downloaders = []
    for i in range(4):
        task = asyncio.create_task(
            download_image(image_urls_queue, session)
        )
        downloaders.append(task)

    await asyncio.gather(*page_getters)

    await pages_queue.join()
    for getter in page_getters:
        getter.cancel()

    await image_urls_queue.join()
    for downloader in downloaders:
        downloader.cancel()

    print(image_urls_queue)

    await session.close()


asyncio.run(main())

asyncio Queue example refactor #

import asyncio
from concurrent.futures import ProcessPoolExecutor

import aiohttp
import aiofiles
from bs4 import BeautifulSoup


async def make_request(url, session):
    response = await session.get(url)

    if response.ok:
        return response
    else:
        print(f'{url} returned: {response.status}')


async def get_image_page(queue, session):
    url = 'https://c.xkcd.com/random/comic/'
    response = await make_request(url, session)

    await queue.put(response.url)


def _parse_link(html):
    soup = BeautifulSoup(html, 'lxml')
    image_link = 'https:' + soup.select_one('div#comic>img').get('src')
    return image_link


async def get_image_url(pages_queue, image_urls_queue, session):
    while True:
        url = await pages_queue.get()
        response = await make_request(url, session)
        html = await response.text()

        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            image_link = await loop.run_in_executor(
                pool,
                _parse_link,
                html
            )

        await image_urls_queue.put(image_link)

        pages_queue.task_done()


async def download_image(image_urls_queue, session):
    while True:
        url = await image_urls_queue.get()
        response = await make_request(url, session)

        filename = url.split('/')[-1]
        async with aiofiles.open(filename, 'wb') as f:
            async for chunk in response.content.iter_chunked(1024):
                await f.write(chunk)

        image_urls_queue.task_done()


def cancel_tasks(tasks):
    [task.cancel() for task in tasks]


def create_tasks(number_of_workers, coro, *args):
    tasks = []
    for _ in range(number_of_workers):
        task = asyncio.create_task(
            coro(*args)
        )
    tasks.append(task)

    # return [asyncio.create_task(coro(*args)) for _ in range(number_of_workers)]

    return tasks


async def main():
    session = aiohttp.ClientSession()

    pages_queue = asyncio.Queue()
    image_urls_queue = asyncio.Queue()

    page_getters = create_tasks(4, get_image_page, pages_queue, session)
    url_getters = create_tasks(4, get_image_url, pages_queue, image_urls_queue, session)
    downloaders = create_tasks(4, download_image, image_urls_queue, session)

    await asyncio.gather(*page_getters)

    await pages_queue.join()
    cancel_tasks(page_getters)

    await image_urls_queue.join()
    cancel_tasks(downloaders)

    print(image_urls_queue)

    await session.close()


asyncio.run(main())

asyncio Lock #

import asyncio
import time

import aiohttp


lock = asyncio.Lock()


async def make_request(url):
    async with aiohttp.ClientSession() as session:

        async with lock:

            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)


async def without_lock(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            print(data)


async def get_data(url):
    await make_request(url)


async def main():
    start = time.monotonic()

    tasks = [
        asyncio.create_task(get_data('http://localhost:8000'))
        for _ in range(20)
    ]

    wo_locks = [
        asyncio.create_task(
            without_lock('http://localhost:8000/hello')
        )
        for _ in range(3)
    ]

    await asyncio.gather(*tasks, *wo_locks)

    print(time.monotonic() - start)


asyncio.run(main())

asyncio Semaphore #

import asyncio
import time

import aiohttp


semaphore = asyncio.Semaphore(4)


def limit_rate(calls_limit=5, timeout=5):

    def wrapper(coro):
        semaphore = asyncio.Semaphore(calls_limit)

        async def wait():
            try:
                await asyncio.sleep(timeout)
            finally:
                semaphore.release()

        async def inner_coro(*args, **kwargs):
            await semaphore.acquire()
            asyncio.create_task(wait())

            return await coro(*args, **kwargs)

        return inner_coro

    return wrapper


@limit_rate(calls_limit=5, timeout=5)
async def make_request(url):
    async with aiohttp.ClientSession() as session:

        async with semaphore:

            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)
                print('------')


async def get_data(url):
    await make_request(url)


async def main():
    start = time.monotonic()

    tasks = [
        asyncio.create_task(get_data('http://localhost:8000'))
        for _ in range(20)
    ]

    await asyncio.gather(*tasks)

    print(time.monotonic() - start)


asyncio.run(main())

asyncio Event #

import asyncio

from random import randint


async def worker(event):
    print('Before the wait()')
    await event.wait()

    if event.is_set():
        print(f'Event is set, random number: {randint(1, 5)}')


async def coro(event):

    timeout = randint(3, 5)
    await asyncio.sleep(timeout)

    print(f'Event was set by coro after {timeout} sec.')
    event.set()


async def main():
    event = asyncio.Event()

    tasks = [
        asyncio.create_task(worker(event))
        for _ in range(5)
    ]

    asyncio.create_task(coro(event))

    await asyncio.gather(*tasks)




asyncio.run(main())

asyncio Condition #

import asyncio
from random import randint


async def waiter(condition, id):
    async with condition:
        print(f'Waiter {id} is awaiting')
        await condition.wait()

        num = randint(1, 5)
        print(f'Waiter {id} generated {num}')


async def starter(condition):
    print('Waiting for 5 seconds')
    await asyncio.sleep(5)

    async with condition:
        condition.notify(2)


async def main():
    condition = asyncio.Condition()

    waiters = [
        asyncio.create_task(waiter(condition, id=i))
        for i in range(5)
    ]

    asyncio.create_task(starter(condition))

    await asyncio.gather(*waiters)


asyncio.run(main())

asyncio Condition example #

import asyncio
import time

import aiohttp



async def make_request(url, condition):
    async with aiohttp.ClientSession() as session:

        async with condition:
            await condition.wait()

            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)


async def done(condition):
    print('Will start after 5 seconds')
    await asyncio.sleep(5)
    async with condition:
        condition.notify_all()


async def without_lock(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            print(data)


async def get_data(url, condition):
    await make_request(url, condition)


async def main():

    condition = asyncio.Condition()

    tasks = [
        asyncio.create_task(
            get_data(
                'http://localhost:8000',
                condition
            )
        )
        for _ in range(20)
    ]

    wo_locks = [
        asyncio.create_task(
            without_lock('http://localhost:8000/hello')
        )
        for _ in range(3)
    ]

    asyncio.create_task(done(condition))

    await asyncio.gather(*tasks, *wo_locks)


asyncio.run(main())