asyncio及其接口
2025年5月2日
协程 #
import asyncio
async def one():
    return 1
async def greet():
    await asyncio.sleep(2)
    return 'Hello world'
async def main():
    res1 = await one()
    res2 = await greet()
    print(res1)
    print(res2)
asyncio.run(main())
Task #
Task的cancelled接口 #
import asyncio
async def c():
    return 1
async def main():
    task = asyncio.create_task(c())
    await task
    print(task.cancelled())
asyncio.run(main())
create_task #
create_task() 方法用于创建一个新的 Task 对象,并将其添加到事件循环中。不会直接启动任务,直到遇到一个await
import asyncio
async def one():
    return 1
async def greet(timeout):
    print('run', timeout)
    await asyncio.sleep(timeout)
    return f'Hello world {timeout}'
async def main():
    res1 = asyncio.create_task(one())
    res2 = asyncio.create_task(greet(2))
    res3 = asyncio.create_task(greet(3))
    res4 = asyncio.create_task(greet(4))
    res5 = asyncio.create_task(greet(2))
    res6 = asyncio.create_task(greet(3))
    print('sleep 3')
    await asyncio.sleep(3)
    print('sleep end')
    print(await res1)
    print(await res2)
    print(await res3)
    print(await res4)
    print(await res5)
    print(await res6)
asyncio.run(main())
# sleep 3
# run 2
# run 3
# run 4
# run 2
# run 3
# sleep end
# 1
# Hello world 2
# Hello world 3
# Hello world 4
# Hello world 2
# Hello world 
取消任务 #
主动取消捕获异常 #
import asyncio
async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'
async def main():
    long_task = asyncio.create_task(greet(6))
    seconds = 0
    while not long_task.done():
        await asyncio.sleep(1)
        seconds += 1
        if seconds == 5:
            long_task.cancel()
        print('Time passed:', seconds)
    try:
        result = await long_task
        print(result)
    except asyncio.CancelledError:
        print('The long task cancelled')
asyncio.run(main())
设置超时时间并取消任务 #
import asyncio
async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'
async def main():
    long_task = asyncio.create_task(greet(6))
    try:
        result = await asyncio.wait_for(
            long_task,
            timeout=3
        )
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('The long task took longer than 3 seconds')
        try:
            result = await long_task
            print(result)
        except asyncio.exceptions.CancelledError:
            print('The long task cancelled')
asyncio.run(main())
设置超时时间但不取消任务 #
import asyncio
async def greet(timeout):
    await asyncio.sleep(timeout)
    return 'Hello world'
async def main():
    long_task = asyncio.create_task(greet(6))
    try:
        result = await asyncio.wait_for(
            asyncio.shield(long_task),
            timeout=3
        )
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('The long task took longer than 3 seconds')
        result = await long_task
        print(result)
asyncio.run(main())
asyncio.run #
coroutine未执行结束 #
import asyncio
async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)
async def main():
    print('-- main beginning')
    asyncio.create_task(coro('text'))  # 没有await
    await asyncio.sleep(0.5)
    print('-- main done')
asyncio.run(main())
# -- main beginning
# text
# -- main done
coroutine执行结束 #
import asyncio
async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)
async def main():
    print('-- main beginning')
    asyncio.create_task(coro('text'))  # 没有await
    await asyncio.sleep(1)
    print('-- main done')
asyncio.run(main())
# -- main beginning
# text
# -- main done
# text
查看所有任务 #
import asyncio
async def coro(message):
    print(message)
    await asyncio.sleep(1)
    print(message)
async def main():
    print(asyncio.all_tasks())
    print('-- main beginning')
    asyncio.create_task(coro('text'))
    print(asyncio.all_tasks())
    await asyncio.sleep(0.5)
    print('-- main done')
asyncio.run(main())
# {<Task pending name='Task-1' coro=<main() running at 2.py:11> cb=[_run_until_complete_cb() at base_events.py:184]>}
# -- main beginning
# {<Task pending name='Task-2' coro=<coro() running at 2.py:4>>, <Task pending name='Task-1' coro=<main() running at 2.py:15> cb=[_run_until_complete_cb() at base_events.py:184]>}
# text
# -- main done
ContextManager #
import asyncio
import aiohttp
class WriteToFile:
    def __init__(self, filename):
        self.filename = filename
    def __enter__(self):
        self.file_object = open(self.filename, 'w')
        return self.file_object
    def __exit__(self, exc_type, exc_value, traceback):
        if self.file_object:
            self.file_object.close()
class AsyncSession:
    def __init__(self, url):
        self.url = url
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()
async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        print(f'{url}: {html[:20]}')
async def main():
    await asyncio.create_task(check('https://facebook.com'))
    await asyncio.create_task(check('https://youtube.com'))
    await asyncio.create_task(check('https://google.com'))
asyncio.run(main())
group task by gather #
gather 捕获异常的任务, 等待所有任务执行结束 #
import asyncio
import aiohttp
class AsyncSession:
    def __init__(self, url):
        self.url = url
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()
class ServerError(Exception):
    def __init__(self, message):
        self._message = message
    def __str__(self):
        return self._message
async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')
async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'
async def main():
    coros = [
        check('https://www.baidu.com'),
        check('https://www.douyin.com'),
        check('https://www.bilibili.com')
    ]
    results = await asyncio.gather(
        *coros,
        server_returns_error(),
        return_exceptions=True
    )
    for res in results:
        print(res)
asyncio.run(main())
gather 只有部分任务结束 #
import asyncio
import aiohttp
class AsyncSession:
    def __init__(self, url):
        self.url = url
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()
class ServerError(Exception):
    def __init__(self, message):
        self._message = message
    def __str__(self):
        return self._message
async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')
async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'
async def main():
    coros = [
        check('https://www.baidu.com'),
        check('https://www.douyin.com'),
        check('https://www.bilibili.com'),
    ]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print(result)
    await asyncio.sleep(1)  # windows need to avoid loop close exception
asyncio.run(main())
gather gather #
import asyncio
import aiohttp
class AsyncSession:
    def __init__(self, url):
        self.url = url
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()
class ServerError(Exception):
    def __init__(self, message):
        self._message = message
    def __str__(self):
        return self._message
async def server_returns_error():
    await asyncio.sleep(3)
    raise ServerError('Failed to get data')
async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'
async def main():
    group1 = asyncio.gather(
        check('https://www.baidu.com'),
        check('https://www.sina.com'),
    )
    group2 = asyncio.gather(
        check('https://www.douban.com'),
        check('https://www.bilibili.com')
    )
    print(type(group2))
    groups = asyncio.gather(group1, group2)
    results = await groups
    for res in results:
        print(res)
    await asyncio.sleep(1)
asyncio.run(main())
non-obvious issue #
有异步 #
import asyncio
async def nothing():
    print('Busy')
async def busy_loop():
    for i in range(10):  # while True: 会导致normal任务无法执行
        await nothing()
async def normal():
    for i in range(10):
        print('Normal coroutine')
async def main():
    await asyncio.gather(
        busy_loop(),
        normal()
    )
asyncio.run(main())
有异步 #
import asyncio
async def nothing():
    print('Busy')
    await asyncio.sleep(0)
async def busy_loop():
    for i in range(10):
        await nothing()
async def normal():
    for i in range(10):
        print('Normal coroutine')
        await asyncio.sleep(0)
async def main():
    t1 = asyncio.create_task(busy_loop())
    t2 = asyncio.create_task(normal())
    await t1
    await t2
asyncio.run(main())
没有异步 #
import asyncio
async def nothing():
    print('Busy')
    await asyncio.sleep(0)
async def busy_loop():
    for i in range(10):
        await nothing()
async def normal():
    for i in range(10):
        print('Normal coroutine')
        await asyncio.sleep(0)
async def main():
    await asyncio.create_task(busy_loop())
    await asyncio.create_task(normal())
asyncio.run(main())
TaskGroup #
需要3.11以上版本
import asyncio
import aiohttp
class AsyncSession:
    def __init__(self, url):
        self.url = url
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        response = await self.session.get(self.url)
        return response
    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.session.close()
async def check(url):
    async with AsyncSession(url) as response:
        html = await response.text()
        return f'{url}: {html[:15]}'
async def main():
    async with asyncio.TaskGroup() as tg:
        print(type(tg))
        print(dir(tg))
        print()
        task1 = tg.create_task(check('https://www.baidu.com'))
        task2 = tg.create_task(check('https://www.sina.com'))
        task3 = tg.create_task(check('https://www.bilibili.com'))
    # 不能移动到async with语句内, task还未结束
    print(task1.result())
    print(task2.result())
    print(task3.result())
asyncio.run(main())
gather with exception #
只能捕获第一个异常 #
import asyncio
async def coro_norm():
    return 'Hello world'
async def coro_value_error():
    raise ValueError
async def coro_type_error():
    raise TypeError
async def main():
    try:
        results = await asyncio.gather(
            coro_norm(),
            coro_value_error(),
            coro_type_error(),
        )
    except ValueError as e:
        print(f'{e=}')
    except TypeError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')
asyncio.run(main())
不触发异常 #
import asyncio
async def coro_norm():
    return 'Hello world'
async def coro_value_error():
    raise ValueError
async def coro_type_error():
    raise TypeError
async def main():
    try:
        results = await asyncio.gather(
            coro_norm(),
            coro_value_error(),
            coro_type_error(),
            return_exceptions=True
        )
    except ValueError as e:
        print(f'{e=}')
    except TypeError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')
asyncio.run(main())
TaskGroup with exception #
捕获所有异常 #
exception* 捕获异步异常
import asyncio
async def coro_norm():
    return 'Hello world'
async def coro_value_error():
    raise ValueError
async def coro_type_error():
    raise TypeError
async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_norm())
            task2 = tg.create_task(coro_value_error())
            task3 = tg.create_task(coro_type_error())
        results = [task1.result(), task2.result(), task3.result()]
    except* ValueError as e:
        print(f'{e=}')
    except* TypeError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')
asyncio.run(main())
组任务cancle #
gather cancel #
import asyncio
async def coro_norm():
    print('coro_norm')
    return 'Hello world'
async def coro_value_error():
    raise ValueError
async def coro_long():
    try:
        print('Long task is running...')
        await asyncio.sleep(3)
        return 'Long task result'
    except asyncio.CancelledError:
        print('All needed actions are done')
        raise asyncio.CancelledError
async def main():
    task1 = asyncio.create_task(coro_norm())
    task2 = asyncio.create_task(coro_value_error())
    task3 = asyncio.create_task(coro_long(), name='coro_long')
    tasks = [task1, task2, task3]
    try:
        results = await asyncio.gather(
            *tasks
        )
    except ValueError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')
    for task in tasks:
        if task.done() is False:
            task.cancel()
            print(f'Pending: {task.get_name()}')
    await asyncio.sleep(2)
    print()
    print(f'{task1._state}')
    print(f'{task2._state}')
    print(f'{task3._state}')
asyncio.run(main())
gather cancel with exception #
出错的任务会导致之后的任务取消
import asyncio
async def coro_norm():
    return 'Hello world'
async def coro_value_error():
    raise ValueError
async def coro_long():
    try:
        print('Long task is running...')
        await asyncio.sleep(2)
        return 'Long task result'
    except asyncio.CancelledError:
        print('All needed actions are done')
        raise asyncio.CancelledError
async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_norm())
            task2 = tg.create_task(coro_value_error())
            task3 = tg.create_task(coro_long())
        results = [task1.result(), task2.result(), task3.result()]
    except* ValueError as e:
        print(f'{e=}')
    except* TypeError as e:
        print(f'{e=}')
    else:
        print(f'{results=}')
asyncio.run(main())
# Long task is running...
# All needed actions are done
# e=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()])
async for #
import asyncio
from redis import asyncio as aioredis
class A:
    def __iter__(self):
        self.x = 0
        return self
    def __next__(self):
        if self.x > 2:
            raise StopIteration
        else:
            self.x += 1
            return self.x
class RedisReader:
    def __init__(self, redis, keys):
        self.redis = redis
        self.keys = keys
    def __aiter__(self):
        self.ikeys = iter(self.keys)
        return self
    async def __anext__(self):
        try:
            key = next(self.ikeys)
        except StopIteration:
            raise StopAsyncIteration
        async with self.redis.client() as connection:
            value = await connection.get(key)
        return value
async def main():
    redis = await aioredis.from_url('redis://localhost')
    keys = ['morston', 'morgan', 'vader', 'lennon']
    async for name in RedisReader(redis, keys):
        print(name)
asyncio.run(main())
async comprehension #
import asyncio
from faker import Faker
faker = Faker('en_US')
async def get_user(n=1):
    for i in range(n):
        await asyncio.sleep(0.1)
        name, surname = faker.name_male().split()
        yield name, surname
async def main():
    users_list = [name async for name in get_user(3)]
    # print(users_list)
    users_dict = {name: surname async for name, surname in get_user(3)}
    # print(users_dict)
    users_set = {name async for name in get_user(3)}
    print(users_set)
asyncio.run(main())
async context manager #
import asyncio
from contextlib import contextmanager, asynccontextmanager
from redis import asyncio as aioredis
@contextmanager
def custom_open(filename, mode='w'):
    file_obj = open(filename, mode)
    yield file_obj
    file_obj.close()
@asynccontextmanager
async def redis_connection():
    try:
        redis = await aioredis.from_url('redis://localhost')
        yield redis
    finally:
        await redis.aclose()
async def main():
    # with custom_open('file.txt') as f:
    #     f.write('Hello world')
    async with redis_connection() as redis:
        await redis.set('my_key', 'asyncio course')
asyncio.run(main())
async Queue #
import asyncio
from random import randint
class C:
    norm = '\033[0m'
    blue = '\033[94m'
    green = '\033[92m'
c = C()
async def producer(queue, name):
    timeout = randint(1, 5)
    await queue.put(timeout)
    print(f'{c.blue}Producer {name} put {timeout} to the queue: {queue}{c.norm}')
async def consumer(queue, name):
    while True:
        timeout = await queue.get()
        await asyncio.sleep(timeout)
        print(f'{c.green}Consumer {name} ate {timeout}, queue: {queue}{c.norm}')
        queue.task_done()
async def main():
    queue = asyncio.Queue(maxsize=3)
    producers = []
    for i in range(12):
        task = asyncio.create_task(producer(queue, name=i))
        producers.append(task)
    consumers = []
    for i in range(4):
        task = asyncio.create_task(consumer(queue, name=i))
        consumers.append(task)
    await asyncio.gather(*producers)
    await queue.join()
    for c in consumers:
        c.cancel()
asyncio.run(main())
asyncio Queue example #
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
async def make_request(url, session):
    response = await session.get(url)
    if response.ok:
        return response
    else:
        print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
    url = 'https://c.xkcd.com/random/comic/'
    response = await make_request(url, session)
    await queue.put(response.url)
def _parse_link(html):
    soup = BeautifulSoup(html, 'lxml')
    image_link = 'https:' + soup.select_one('div#comic>img').get('src')
    return image_link
async def get_image_url(pages_queue, image_urls_queue, session):
    while True:
        url = await pages_queue.get()
        response = await make_request(url, session)
        html = await response.text()
        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            image_link = await loop.run_in_executor(
                pool,
                _parse_link,
                html
            )
        await image_urls_queue.put(image_link)
        pages_queue.task_done()
async def download_image(image_urls_queue, session):
    while True:
        url = await image_urls_queue.get()
        response = await make_request(url, session)
        filename = url.split('/')[-1]
        async with aiofiles.open(filename, 'wb') as f:
            async for chunk in response.content.iter_chunked(1024):
                await f.write(chunk)
        image_urls_queue.task_done()
async def main():
    session = aiohttp.ClientSession()
    pages_queue = asyncio.Queue()
    image_urls_queue = asyncio.Queue()
    page_getters = []
    for i in range(4):
        task = asyncio.create_task(
            get_image_page(pages_queue, session)
        )
        page_getters.append(task)
    url_getters = []
    for i in range(4):
        task = asyncio.create_task(
            get_image_url(pages_queue, image_urls_queue, session)
        )
        url_getters.append(task)
    downloaders = []
    for i in range(4):
        task = asyncio.create_task(
            download_image(image_urls_queue, session)
        )
        downloaders.append(task)
    await asyncio.gather(*page_getters)
    await pages_queue.join()
    for getter in page_getters:
        getter.cancel()
    await image_urls_queue.join()
    for downloader in downloaders:
        downloader.cancel()
    print(image_urls_queue)
    await session.close()
asyncio.run(main())
asyncio Queue example refactor #
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
async def make_request(url, session):
    response = await session.get(url)
    if response.ok:
        return response
    else:
        print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
    url = 'https://c.xkcd.com/random/comic/'
    response = await make_request(url, session)
    await queue.put(response.url)
def _parse_link(html):
    soup = BeautifulSoup(html, 'lxml')
    image_link = 'https:' + soup.select_one('div#comic>img').get('src')
    return image_link
async def get_image_url(pages_queue, image_urls_queue, session):
    while True:
        url = await pages_queue.get()
        response = await make_request(url, session)
        html = await response.text()
        loop = asyncio.get_running_loop()
        with ProcessPoolExecutor() as pool:
            image_link = await loop.run_in_executor(
                pool,
                _parse_link,
                html
            )
        await image_urls_queue.put(image_link)
        pages_queue.task_done()
async def download_image(image_urls_queue, session):
    while True:
        url = await image_urls_queue.get()
        response = await make_request(url, session)
        filename = url.split('/')[-1]
        async with aiofiles.open(filename, 'wb') as f:
            async for chunk in response.content.iter_chunked(1024):
                await f.write(chunk)
        image_urls_queue.task_done()
def cancel_tasks(tasks):
    [task.cancel() for task in tasks]
def create_tasks(number_of_workers, coro, *args):
    tasks = []
    for _ in range(number_of_workers):
        task = asyncio.create_task(
            coro(*args)
        )
    tasks.append(task)
    # return [asyncio.create_task(coro(*args)) for _ in range(number_of_workers)]
    return tasks
async def main():
    session = aiohttp.ClientSession()
    pages_queue = asyncio.Queue()
    image_urls_queue = asyncio.Queue()
    page_getters = create_tasks(4, get_image_page, pages_queue, session)
    url_getters = create_tasks(4, get_image_url, pages_queue, image_urls_queue, session)
    downloaders = create_tasks(4, download_image, image_urls_queue, session)
    await asyncio.gather(*page_getters)
    await pages_queue.join()
    cancel_tasks(page_getters)
    await image_urls_queue.join()
    cancel_tasks(downloaders)
    print(image_urls_queue)
    await session.close()
asyncio.run(main())
asyncio Lock #
import asyncio
import time
import aiohttp
lock = asyncio.Lock()
async def make_request(url):
    async with aiohttp.ClientSession() as session:
        async with lock:
            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)
async def without_lock(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            print(data)
async def get_data(url):
    await make_request(url)
async def main():
    start = time.monotonic()
    tasks = [
        asyncio.create_task(get_data('http://localhost:8000'))
        for _ in range(20)
    ]
    wo_locks = [
        asyncio.create_task(
            without_lock('http://localhost:8000/hello')
        )
        for _ in range(3)
    ]
    await asyncio.gather(*tasks, *wo_locks)
    print(time.monotonic() - start)
asyncio.run(main())
asyncio Semaphore #
import asyncio
import time
import aiohttp
semaphore = asyncio.Semaphore(4)
def limit_rate(calls_limit=5, timeout=5):
    def wrapper(coro):
        semaphore = asyncio.Semaphore(calls_limit)
        async def wait():
            try:
                await asyncio.sleep(timeout)
            finally:
                semaphore.release()
        async def inner_coro(*args, **kwargs):
            await semaphore.acquire()
            asyncio.create_task(wait())
            return await coro(*args, **kwargs)
        return inner_coro
    return wrapper
@limit_rate(calls_limit=5, timeout=5)
async def make_request(url):
    async with aiohttp.ClientSession() as session:
        async with semaphore:
            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)
                print('------')
async def get_data(url):
    await make_request(url)
async def main():
    start = time.monotonic()
    tasks = [
        asyncio.create_task(get_data('http://localhost:8000'))
        for _ in range(20)
    ]
    await asyncio.gather(*tasks)
    print(time.monotonic() - start)
asyncio.run(main())
asyncio Event #
import asyncio
from random import randint
async def worker(event):
    print('Before the wait()')
    await event.wait()
    if event.is_set():
        print(f'Event is set, random number: {randint(1, 5)}')
async def coro(event):
    timeout = randint(3, 5)
    await asyncio.sleep(timeout)
    print(f'Event was set by coro after {timeout} sec.')
    event.set()
async def main():
    event = asyncio.Event()
    tasks = [
        asyncio.create_task(worker(event))
        for _ in range(5)
    ]
    asyncio.create_task(coro(event))
    await asyncio.gather(*tasks)
asyncio.run(main())
asyncio Condition #
import asyncio
from random import randint
async def waiter(condition, id):
    async with condition:
        print(f'Waiter {id} is awaiting')
        await condition.wait()
        num = randint(1, 5)
        print(f'Waiter {id} generated {num}')
async def starter(condition):
    print('Waiting for 5 seconds')
    await asyncio.sleep(5)
    async with condition:
        condition.notify(2)
async def main():
    condition = asyncio.Condition()
    waiters = [
        asyncio.create_task(waiter(condition, id=i))
        for i in range(5)
    ]
    asyncio.create_task(starter(condition))
    await asyncio.gather(*waiters)
asyncio.run(main())
asyncio Condition example #
import asyncio
import time
import aiohttp
async def make_request(url, condition):
    async with aiohttp.ClientSession() as session:
        async with condition:
            await condition.wait()
            async with session.get(url) as response:
                data = await response.json()
                print(data)
                await asyncio.sleep(0.5)
async def done(condition):
    print('Will start after 5 seconds')
    await asyncio.sleep(5)
    async with condition:
        condition.notify_all()
async def without_lock(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            print(data)
async def get_data(url, condition):
    await make_request(url, condition)
async def main():
    condition = asyncio.Condition()
    tasks = [
        asyncio.create_task(
            get_data(
                'http://localhost:8000',
                condition
            )
        )
        for _ in range(20)
    ]
    wo_locks = [
        asyncio.create_task(
            without_lock('http://localhost:8000/hello')
        )
        for _ in range(3)
    ]
    asyncio.create_task(done(condition))
    await asyncio.gather(*tasks, *wo_locks)
asyncio.run(main())