asyncio及其接口
2025年5月2日
协程 #
import asyncio
async def one():
return 1
async def greet():
await asyncio.sleep(2)
return 'Hello world'
async def main():
res1 = await one()
res2 = await greet()
print(res1)
print(res2)
asyncio.run(main())
Task #
Task的cancelled接口 #
import asyncio
async def c():
return 1
async def main():
task = asyncio.create_task(c())
await task
print(task.cancelled())
asyncio.run(main())
create_task #
create_task() 方法用于创建一个新的 Task 对象,并将其添加到事件循环中。不会直接启动任务,直到遇到一个await
import asyncio
async def one():
return 1
async def greet(timeout):
print('run', timeout)
await asyncio.sleep(timeout)
return f'Hello world {timeout}'
async def main():
res1 = asyncio.create_task(one())
res2 = asyncio.create_task(greet(2))
res3 = asyncio.create_task(greet(3))
res4 = asyncio.create_task(greet(4))
res5 = asyncio.create_task(greet(2))
res6 = asyncio.create_task(greet(3))
print('sleep 3')
await asyncio.sleep(3)
print('sleep end')
print(await res1)
print(await res2)
print(await res3)
print(await res4)
print(await res5)
print(await res6)
asyncio.run(main())
# sleep 3
# run 2
# run 3
# run 4
# run 2
# run 3
# sleep end
# 1
# Hello world 2
# Hello world 3
# Hello world 4
# Hello world 2
# Hello world
取消任务 #
主动取消捕获异常 #
import asyncio
async def greet(timeout):
await asyncio.sleep(timeout)
return 'Hello world'
async def main():
long_task = asyncio.create_task(greet(6))
seconds = 0
while not long_task.done():
await asyncio.sleep(1)
seconds += 1
if seconds == 5:
long_task.cancel()
print('Time passed:', seconds)
try:
result = await long_task
print(result)
except asyncio.CancelledError:
print('The long task cancelled')
asyncio.run(main())
设置超时时间并取消任务 #
import asyncio
async def greet(timeout):
await asyncio.sleep(timeout)
return 'Hello world'
async def main():
long_task = asyncio.create_task(greet(6))
try:
result = await asyncio.wait_for(
long_task,
timeout=3
)
print(result)
except asyncio.exceptions.TimeoutError:
print('The long task took longer than 3 seconds')
try:
result = await long_task
print(result)
except asyncio.exceptions.CancelledError:
print('The long task cancelled')
asyncio.run(main())
设置超时时间但不取消任务 #
import asyncio
async def greet(timeout):
await asyncio.sleep(timeout)
return 'Hello world'
async def main():
long_task = asyncio.create_task(greet(6))
try:
result = await asyncio.wait_for(
asyncio.shield(long_task),
timeout=3
)
print(result)
except asyncio.exceptions.TimeoutError:
print('The long task took longer than 3 seconds')
result = await long_task
print(result)
asyncio.run(main())
asyncio.run #
coroutine未执行结束 #
import asyncio
async def coro(message):
print(message)
await asyncio.sleep(1)
print(message)
async def main():
print('-- main beginning')
asyncio.create_task(coro('text')) # 没有await
await asyncio.sleep(0.5)
print('-- main done')
asyncio.run(main())
# -- main beginning
# text
# -- main done
coroutine执行结束 #
import asyncio
async def coro(message):
print(message)
await asyncio.sleep(1)
print(message)
async def main():
print('-- main beginning')
asyncio.create_task(coro('text')) # 没有await
await asyncio.sleep(1)
print('-- main done')
asyncio.run(main())
# -- main beginning
# text
# -- main done
# text
查看所有任务 #
import asyncio
async def coro(message):
print(message)
await asyncio.sleep(1)
print(message)
async def main():
print(asyncio.all_tasks())
print('-- main beginning')
asyncio.create_task(coro('text'))
print(asyncio.all_tasks())
await asyncio.sleep(0.5)
print('-- main done')
asyncio.run(main())
# {<Task pending name='Task-1' coro=<main() running at 2.py:11> cb=[_run_until_complete_cb() at base_events.py:184]>}
# -- main beginning
# {<Task pending name='Task-2' coro=<coro() running at 2.py:4>>, <Task pending name='Task-1' coro=<main() running at 2.py:15> cb=[_run_until_complete_cb() at base_events.py:184]>}
# text
# -- main done
ContextManager #
import asyncio
import aiohttp
class WriteToFile:
def __init__(self, filename):
self.filename = filename
def __enter__(self):
self.file_object = open(self.filename, 'w')
return self.file_object
def __exit__(self, exc_type, exc_value, traceback):
if self.file_object:
self.file_object.close()
class AsyncSession:
def __init__(self, url):
self.url = url
async def __aenter__(self):
self.session = aiohttp.ClientSession()
response = await self.session.get(self.url)
return response
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.close()
async def check(url):
async with AsyncSession(url) as response:
html = await response.text()
print(f'{url}: {html[:20]}')
async def main():
await asyncio.create_task(check('https://facebook.com'))
await asyncio.create_task(check('https://youtube.com'))
await asyncio.create_task(check('https://google.com'))
asyncio.run(main())
group task by gather #
gather 捕获异常的任务, 等待所有任务执行结束 #
import asyncio
import aiohttp
class AsyncSession:
def __init__(self, url):
self.url = url
async def __aenter__(self):
self.session = aiohttp.ClientSession()
response = await self.session.get(self.url)
return response
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.close()
class ServerError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def server_returns_error():
await asyncio.sleep(3)
raise ServerError('Failed to get data')
async def check(url):
async with AsyncSession(url) as response:
html = await response.text()
return f'{url}: {html[:15]}'
async def main():
coros = [
check('https://www.baidu.com'),
check('https://www.douyin.com'),
check('https://www.bilibili.com')
]
results = await asyncio.gather(
*coros,
server_returns_error(),
return_exceptions=True
)
for res in results:
print(res)
asyncio.run(main())
gather 只有部分任务结束 #
import asyncio
import aiohttp
class AsyncSession:
def __init__(self, url):
self.url = url
async def __aenter__(self):
self.session = aiohttp.ClientSession()
response = await self.session.get(self.url)
return response
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.close()
class ServerError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def server_returns_error():
await asyncio.sleep(3)
raise ServerError('Failed to get data')
async def check(url):
async with AsyncSession(url) as response:
html = await response.text()
return f'{url}: {html[:15]}'
async def main():
coros = [
check('https://www.baidu.com'),
check('https://www.douyin.com'),
check('https://www.bilibili.com'),
]
for coro in asyncio.as_completed(coros):
result = await coro
print(result)
await asyncio.sleep(1) # windows need to avoid loop close exception
asyncio.run(main())
gather gather #
import asyncio
import aiohttp
class AsyncSession:
def __init__(self, url):
self.url = url
async def __aenter__(self):
self.session = aiohttp.ClientSession()
response = await self.session.get(self.url)
return response
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.close()
class ServerError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def server_returns_error():
await asyncio.sleep(3)
raise ServerError('Failed to get data')
async def check(url):
async with AsyncSession(url) as response:
html = await response.text()
return f'{url}: {html[:15]}'
async def main():
group1 = asyncio.gather(
check('https://www.baidu.com'),
check('https://www.sina.com'),
)
group2 = asyncio.gather(
check('https://www.douban.com'),
check('https://www.bilibili.com')
)
print(type(group2))
groups = asyncio.gather(group1, group2)
results = await groups
for res in results:
print(res)
await asyncio.sleep(1)
asyncio.run(main())
non-obvious issue #
有异步 #
import asyncio
async def nothing():
print('Busy')
async def busy_loop():
for i in range(10): # while True: 会导致normal任务无法执行
await nothing()
async def normal():
for i in range(10):
print('Normal coroutine')
async def main():
await asyncio.gather(
busy_loop(),
normal()
)
asyncio.run(main())
有异步 #
import asyncio
async def nothing():
print('Busy')
await asyncio.sleep(0)
async def busy_loop():
for i in range(10):
await nothing()
async def normal():
for i in range(10):
print('Normal coroutine')
await asyncio.sleep(0)
async def main():
t1 = asyncio.create_task(busy_loop())
t2 = asyncio.create_task(normal())
await t1
await t2
asyncio.run(main())
没有异步 #
import asyncio
async def nothing():
print('Busy')
await asyncio.sleep(0)
async def busy_loop():
for i in range(10):
await nothing()
async def normal():
for i in range(10):
print('Normal coroutine')
await asyncio.sleep(0)
async def main():
await asyncio.create_task(busy_loop())
await asyncio.create_task(normal())
asyncio.run(main())
TaskGroup #
需要3.11以上版本
import asyncio
import aiohttp
class AsyncSession:
def __init__(self, url):
self.url = url
async def __aenter__(self):
self.session = aiohttp.ClientSession()
response = await self.session.get(self.url)
return response
async def __aexit__(self, exc_type, exc_value, traceback):
await self.session.close()
async def check(url):
async with AsyncSession(url) as response:
html = await response.text()
return f'{url}: {html[:15]}'
async def main():
async with asyncio.TaskGroup() as tg:
print(type(tg))
print(dir(tg))
print()
task1 = tg.create_task(check('https://www.baidu.com'))
task2 = tg.create_task(check('https://www.sina.com'))
task3 = tg.create_task(check('https://www.bilibili.com'))
# 不能移动到async with语句内, task还未结束
print(task1.result())
print(task2.result())
print(task3.result())
asyncio.run(main())
gather with exception #
只能捕获第一个异常 #
import asyncio
async def coro_norm():
return 'Hello world'
async def coro_value_error():
raise ValueError
async def coro_type_error():
raise TypeError
async def main():
try:
results = await asyncio.gather(
coro_norm(),
coro_value_error(),
coro_type_error(),
)
except ValueError as e:
print(f'{e=}')
except TypeError as e:
print(f'{e=}')
else:
print(f'{results=}')
asyncio.run(main())
不触发异常 #
import asyncio
async def coro_norm():
return 'Hello world'
async def coro_value_error():
raise ValueError
async def coro_type_error():
raise TypeError
async def main():
try:
results = await asyncio.gather(
coro_norm(),
coro_value_error(),
coro_type_error(),
return_exceptions=True
)
except ValueError as e:
print(f'{e=}')
except TypeError as e:
print(f'{e=}')
else:
print(f'{results=}')
asyncio.run(main())
TaskGroup with exception #
捕获所有异常 #
exception* 捕获异步异常
import asyncio
async def coro_norm():
return 'Hello world'
async def coro_value_error():
raise ValueError
async def coro_type_error():
raise TypeError
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coro_norm())
task2 = tg.create_task(coro_value_error())
task3 = tg.create_task(coro_type_error())
results = [task1.result(), task2.result(), task3.result()]
except* ValueError as e:
print(f'{e=}')
except* TypeError as e:
print(f'{e=}')
else:
print(f'{results=}')
asyncio.run(main())
组任务cancle #
gather cancel #
import asyncio
async def coro_norm():
print('coro_norm')
return 'Hello world'
async def coro_value_error():
raise ValueError
async def coro_long():
try:
print('Long task is running...')
await asyncio.sleep(3)
return 'Long task result'
except asyncio.CancelledError:
print('All needed actions are done')
raise asyncio.CancelledError
async def main():
task1 = asyncio.create_task(coro_norm())
task2 = asyncio.create_task(coro_value_error())
task3 = asyncio.create_task(coro_long(), name='coro_long')
tasks = [task1, task2, task3]
try:
results = await asyncio.gather(
*tasks
)
except ValueError as e:
print(f'{e=}')
else:
print(f'{results=}')
for task in tasks:
if task.done() is False:
task.cancel()
print(f'Pending: {task.get_name()}')
await asyncio.sleep(2)
print()
print(f'{task1._state}')
print(f'{task2._state}')
print(f'{task3._state}')
asyncio.run(main())
gather cancel with exception #
出错的任务会导致之后的任务取消
import asyncio
async def coro_norm():
return 'Hello world'
async def coro_value_error():
raise ValueError
async def coro_long():
try:
print('Long task is running...')
await asyncio.sleep(2)
return 'Long task result'
except asyncio.CancelledError:
print('All needed actions are done')
raise asyncio.CancelledError
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coro_norm())
task2 = tg.create_task(coro_value_error())
task3 = tg.create_task(coro_long())
results = [task1.result(), task2.result(), task3.result()]
except* ValueError as e:
print(f'{e=}')
except* TypeError as e:
print(f'{e=}')
else:
print(f'{results=}')
asyncio.run(main())
# Long task is running...
# All needed actions are done
# e=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()])
async for #
import asyncio
from redis import asyncio as aioredis
class A:
def __iter__(self):
self.x = 0
return self
def __next__(self):
if self.x > 2:
raise StopIteration
else:
self.x += 1
return self.x
class RedisReader:
def __init__(self, redis, keys):
self.redis = redis
self.keys = keys
def __aiter__(self):
self.ikeys = iter(self.keys)
return self
async def __anext__(self):
try:
key = next(self.ikeys)
except StopIteration:
raise StopAsyncIteration
async with self.redis.client() as connection:
value = await connection.get(key)
return value
async def main():
redis = await aioredis.from_url('redis://localhost')
keys = ['morston', 'morgan', 'vader', 'lennon']
async for name in RedisReader(redis, keys):
print(name)
asyncio.run(main())
async comprehension #
import asyncio
from faker import Faker
faker = Faker('en_US')
async def get_user(n=1):
for i in range(n):
await asyncio.sleep(0.1)
name, surname = faker.name_male().split()
yield name, surname
async def main():
users_list = [name async for name in get_user(3)]
# print(users_list)
users_dict = {name: surname async for name, surname in get_user(3)}
# print(users_dict)
users_set = {name async for name in get_user(3)}
print(users_set)
asyncio.run(main())
async context manager #
import asyncio
from contextlib import contextmanager, asynccontextmanager
from redis import asyncio as aioredis
@contextmanager
def custom_open(filename, mode='w'):
file_obj = open(filename, mode)
yield file_obj
file_obj.close()
@asynccontextmanager
async def redis_connection():
try:
redis = await aioredis.from_url('redis://localhost')
yield redis
finally:
await redis.aclose()
async def main():
# with custom_open('file.txt') as f:
# f.write('Hello world')
async with redis_connection() as redis:
await redis.set('my_key', 'asyncio course')
asyncio.run(main())
async Queue #
import asyncio
from random import randint
class C:
norm = '\033[0m'
blue = '\033[94m'
green = '\033[92m'
c = C()
async def producer(queue, name):
timeout = randint(1, 5)
await queue.put(timeout)
print(f'{c.blue}Producer {name} put {timeout} to the queue: {queue}{c.norm}')
async def consumer(queue, name):
while True:
timeout = await queue.get()
await asyncio.sleep(timeout)
print(f'{c.green}Consumer {name} ate {timeout}, queue: {queue}{c.norm}')
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
producers = []
for i in range(12):
task = asyncio.create_task(producer(queue, name=i))
producers.append(task)
consumers = []
for i in range(4):
task = asyncio.create_task(consumer(queue, name=i))
consumers.append(task)
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
asyncio Queue example #
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
async def make_request(url, session):
response = await session.get(url)
if response.ok:
return response
else:
print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
url = 'https://c.xkcd.com/random/comic/'
response = await make_request(url, session)
await queue.put(response.url)
def _parse_link(html):
soup = BeautifulSoup(html, 'lxml')
image_link = 'https:' + soup.select_one('div#comic>img').get('src')
return image_link
async def get_image_url(pages_queue, image_urls_queue, session):
while True:
url = await pages_queue.get()
response = await make_request(url, session)
html = await response.text()
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
image_link = await loop.run_in_executor(
pool,
_parse_link,
html
)
await image_urls_queue.put(image_link)
pages_queue.task_done()
async def download_image(image_urls_queue, session):
while True:
url = await image_urls_queue.get()
response = await make_request(url, session)
filename = url.split('/')[-1]
async with aiofiles.open(filename, 'wb') as f:
async for chunk in response.content.iter_chunked(1024):
await f.write(chunk)
image_urls_queue.task_done()
async def main():
session = aiohttp.ClientSession()
pages_queue = asyncio.Queue()
image_urls_queue = asyncio.Queue()
page_getters = []
for i in range(4):
task = asyncio.create_task(
get_image_page(pages_queue, session)
)
page_getters.append(task)
url_getters = []
for i in range(4):
task = asyncio.create_task(
get_image_url(pages_queue, image_urls_queue, session)
)
url_getters.append(task)
downloaders = []
for i in range(4):
task = asyncio.create_task(
download_image(image_urls_queue, session)
)
downloaders.append(task)
await asyncio.gather(*page_getters)
await pages_queue.join()
for getter in page_getters:
getter.cancel()
await image_urls_queue.join()
for downloader in downloaders:
downloader.cancel()
print(image_urls_queue)
await session.close()
asyncio.run(main())
asyncio Queue example refactor #
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
async def make_request(url, session):
response = await session.get(url)
if response.ok:
return response
else:
print(f'{url} returned: {response.status}')
async def get_image_page(queue, session):
url = 'https://c.xkcd.com/random/comic/'
response = await make_request(url, session)
await queue.put(response.url)
def _parse_link(html):
soup = BeautifulSoup(html, 'lxml')
image_link = 'https:' + soup.select_one('div#comic>img').get('src')
return image_link
async def get_image_url(pages_queue, image_urls_queue, session):
while True:
url = await pages_queue.get()
response = await make_request(url, session)
html = await response.text()
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
image_link = await loop.run_in_executor(
pool,
_parse_link,
html
)
await image_urls_queue.put(image_link)
pages_queue.task_done()
async def download_image(image_urls_queue, session):
while True:
url = await image_urls_queue.get()
response = await make_request(url, session)
filename = url.split('/')[-1]
async with aiofiles.open(filename, 'wb') as f:
async for chunk in response.content.iter_chunked(1024):
await f.write(chunk)
image_urls_queue.task_done()
def cancel_tasks(tasks):
[task.cancel() for task in tasks]
def create_tasks(number_of_workers, coro, *args):
tasks = []
for _ in range(number_of_workers):
task = asyncio.create_task(
coro(*args)
)
tasks.append(task)
# return [asyncio.create_task(coro(*args)) for _ in range(number_of_workers)]
return tasks
async def main():
session = aiohttp.ClientSession()
pages_queue = asyncio.Queue()
image_urls_queue = asyncio.Queue()
page_getters = create_tasks(4, get_image_page, pages_queue, session)
url_getters = create_tasks(4, get_image_url, pages_queue, image_urls_queue, session)
downloaders = create_tasks(4, download_image, image_urls_queue, session)
await asyncio.gather(*page_getters)
await pages_queue.join()
cancel_tasks(page_getters)
await image_urls_queue.join()
cancel_tasks(downloaders)
print(image_urls_queue)
await session.close()
asyncio.run(main())
asyncio Lock #
import asyncio
import time
import aiohttp
lock = asyncio.Lock()
async def make_request(url):
async with aiohttp.ClientSession() as session:
async with lock:
async with session.get(url) as response:
data = await response.json()
print(data)
await asyncio.sleep(0.5)
async def without_lock(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
print(data)
async def get_data(url):
await make_request(url)
async def main():
start = time.monotonic()
tasks = [
asyncio.create_task(get_data('http://localhost:8000'))
for _ in range(20)
]
wo_locks = [
asyncio.create_task(
without_lock('http://localhost:8000/hello')
)
for _ in range(3)
]
await asyncio.gather(*tasks, *wo_locks)
print(time.monotonic() - start)
asyncio.run(main())
asyncio Semaphore #
import asyncio
import time
import aiohttp
semaphore = asyncio.Semaphore(4)
def limit_rate(calls_limit=5, timeout=5):
def wrapper(coro):
semaphore = asyncio.Semaphore(calls_limit)
async def wait():
try:
await asyncio.sleep(timeout)
finally:
semaphore.release()
async def inner_coro(*args, **kwargs):
await semaphore.acquire()
asyncio.create_task(wait())
return await coro(*args, **kwargs)
return inner_coro
return wrapper
@limit_rate(calls_limit=5, timeout=5)
async def make_request(url):
async with aiohttp.ClientSession() as session:
async with semaphore:
async with session.get(url) as response:
data = await response.json()
print(data)
await asyncio.sleep(0.5)
print('------')
async def get_data(url):
await make_request(url)
async def main():
start = time.monotonic()
tasks = [
asyncio.create_task(get_data('http://localhost:8000'))
for _ in range(20)
]
await asyncio.gather(*tasks)
print(time.monotonic() - start)
asyncio.run(main())
asyncio Event #
import asyncio
from random import randint
async def worker(event):
print('Before the wait()')
await event.wait()
if event.is_set():
print(f'Event is set, random number: {randint(1, 5)}')
async def coro(event):
timeout = randint(3, 5)
await asyncio.sleep(timeout)
print(f'Event was set by coro after {timeout} sec.')
event.set()
async def main():
event = asyncio.Event()
tasks = [
asyncio.create_task(worker(event))
for _ in range(5)
]
asyncio.create_task(coro(event))
await asyncio.gather(*tasks)
asyncio.run(main())
asyncio Condition #
import asyncio
from random import randint
async def waiter(condition, id):
async with condition:
print(f'Waiter {id} is awaiting')
await condition.wait()
num = randint(1, 5)
print(f'Waiter {id} generated {num}')
async def starter(condition):
print('Waiting for 5 seconds')
await asyncio.sleep(5)
async with condition:
condition.notify(2)
async def main():
condition = asyncio.Condition()
waiters = [
asyncio.create_task(waiter(condition, id=i))
for i in range(5)
]
asyncio.create_task(starter(condition))
await asyncio.gather(*waiters)
asyncio.run(main())
asyncio Condition example #
import asyncio
import time
import aiohttp
async def make_request(url, condition):
async with aiohttp.ClientSession() as session:
async with condition:
await condition.wait()
async with session.get(url) as response:
data = await response.json()
print(data)
await asyncio.sleep(0.5)
async def done(condition):
print('Will start after 5 seconds')
await asyncio.sleep(5)
async with condition:
condition.notify_all()
async def without_lock(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
print(data)
async def get_data(url, condition):
await make_request(url, condition)
async def main():
condition = asyncio.Condition()
tasks = [
asyncio.create_task(
get_data(
'http://localhost:8000',
condition
)
)
for _ in range(20)
]
wo_locks = [
asyncio.create_task(
without_lock('http://localhost:8000/hello')
)
for _ in range(3)
]
asyncio.create_task(done(condition))
await asyncio.gather(*tasks, *wo_locks)
asyncio.run(main())