### Что такое стандарт asyncio?
- Фундамент для асинхронных фреймворков
- Базовые абстракции (Future/Coroutine/Task/AbstractEventLoop)
- Высокоуровневый API - Сопрограммы (coroutine, generator coroutine), задачи (Task). Streams, примитивы для синхронизации, Queues. API для работы с процессами и межпроцессного взаимодействия.
- Низкоуровневый API. loop.*, asyncio.Future. Транспорты и протоколы\
### Что такое awaitable-объекты
- Можно использовать в выражении await
- Использование объекта в выражении await означает, что текущая
сопрограмма переключит контекст и будет ожидать, пока выражение
не будет выполнено
- Существует 3 встроенных типа awaitable объектов: coroutine, Task,
Future
### Что такое сопрограмма?
Методика связи программных модулей друг с другом по принципу кооперативной многозадачности: модуль приостанавливается в определённой точке, сохраняя полное состояние (включая стек вызовов и счётчик команд), и передаёт управление другому. Тот, в свою очередь, выполняет задачу и передаёт управление обратно, сохраняя свои стек и счётчик.
### Что такое объект coroutine в python?
Объект, который имеет ряд инструкций, умеет хранить свое состояние
и может переключать контекст (передавать управление)
Является более обобщенной формой подпрограмм (интерпретатор
может выходить в подпрограмму в 1 точке и выходить в другой. В
сопрограммах может быть несколько точек входа, выхода и возврата).
Простыми словами - ГЕНЕРАТОР. А в 3.5+ специальный объект который ведет себя как генератор
### Что такое coroutine function?
- Функция, возвращающая объект coroutine
- Определяется ключевыми словами async def, может содержать ключевые слова await, async for, async with
```
import asyncio
# <class 'function'> это coroutine function
async def actual_coro():
print('hello')
await asyncio.sleep(1)
print('world')
# <class 'coroutine'> это coroutine object
coro = actual_coro()
```
В данном случае контекст переключает `await`
### Как запустить сопрограмму?
1. asyncio.run()
```
import asyncio
async def get_text():
return 'hello, world!'
async def say_text():
text = await get_text()
await asyncio.sleep(1)
return text
result = asyncio.run(say_text())
print(result) # hello, world!
```
asyncio.run(): запуск event loop ожидания окончания работы
асинхронной функции, завершение работы event loop и
отмена всех порожденных асинхронных задач
await: запуск из асинхронного кода с явным переключением контекста: await
2. asyncio.create_task(): запуск задачи в фоновом режиме
```
async def get_text(delay, text):
await asyncio.sleep(delay)
return text
async def say_text():
task1 = asyncio.create_task(get_text(1, 'hello'))
task2 = asyncio.create_task(get_text(1, 'world'))
await task1
await task2
return ', '.join([task1.result(), task2.result()])
result = asyncio.run(say_text())
print(result) # hello, world
```
### Что такое asyncio.Task и зачем он нужен?
- Представляет из себя сопрограмму, запущенную или запланированную для запуска в цикле событий и контекст
- Позволяет запускать задачи в фоновом режиме
- Создается с помощью asyncio.create_task() или loop.create_task(), которые оборачивают сопрограмму в объект Task и планирует ее выполнение в цикле событий на ближайшее время
- Абстракция, позволяющая отменить/прервать выполнение сопрограммы с помощью .cancel()
### Что такое asyncio.Future?
- Cпециальный низкоуровневый awaitable объект, представляющий конечный результат выполнения асинхронной операции
- Позволяет использовать низкоуровневый код, реализованный на функциях обратного вызова с высокоуровневым кодом на async/await
- Создается с помощью loop.create_future()
```
import asyncio
async def my_sleep(delay):
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.call_later(delay, future.set_result, True)
await future
async def main():
loop = asyncio.get_running_loop()
print(loop.time()) # 0.9779213
await my_sleep(1)
print(loop.time()) # 1.979251479
asyncio.run(main())
```
## Высокоуровневый API
### Как приостановить задачу на некоторое время?
- asyncio.sleep() приостанавливает действие текущей задачи на указанное время, позволяя выполнять другие задачи
- Если указан параметр result, его значение возвращается вызывающему объекту по завершении работы сопрограммы
```
import asyncio
async def main():
print(await asyncio.sleep(1, 'hello'))
asyncio.run(main())
```
### Как запустить сразу несколько задач?
```
import aiohttp
import asyncio
async def check_user_exists(user_id: int) -> bool:
async with aiohttp.ClientSession() as session:
url = f'https://example.org/users/{user_id}'
async with session.head(url) as resp:
print(user_id, resp.status == 200)
return resp.status == 200
async def main():
coros = (
check_user_exists(i) for i in range(100)
)
# <class 'list'>: [False, False, ...]
results = await asyncio.gather(*coros)
```
### asyncio.gather()
- asyncio.gather() запускает указанные awaitable объекты в конкурентном режиме и возвращает результаты выполнения в том же порядке
- Оборачивает объекты coroutine в asyncio.Task
- В случае отмены asyncio.gather() отменяются все запущенные (но еще не завершенные) задачи
### Что будет в случае исключения в одной из задач в gather?
```
async def trigger(position):
await asyncio.sleep(position)
if position == 3:
raise RuntimeError('Boooom!')
print('%d is ok' % position)
async def russian_roulette():
coros = (trigger(i) for i in range(8))
try:
await asyncio.gather(*coros)
except RuntimeError as e:
print(e)
await asyncio.sleep(10)
asyncio.run(russian_roulette())
# В данном примере все задачи выполнятся, но 3 задача упадет.
```
- Если параметр return_exceptions=False, первое исключение из задачи, запущенной в gather() будет брошено где запущен await.gather(). Остальные задачи, запущенные в gather(), продолжат выполнение. (значение по умолчанию)
- Если return_exceptions=True, исключения возвращаются в списке с результатами
- Если ожидаемый объект, переданный в gather() будет отменен - gather() не отменяется и остальные задачи продолжают выполняться
### Как отменить задачу?
```
async def coro():
print('start')
await asyncio.sleep(2)
print('finished')
async def cancel(task):
await asyncio.sleep(0.5)
task.cancel()
print('task.cancel() called')
try:
await task
except asyncio.CancelledError:
print('task successfully cancelled')
async def main():
task = asyncio.create_task(coro())
asyncio.create_task(cancel(task))
await asyncio.sleep(5)
assert task.cancelled()
# Задача coro будет прервана и не выведет finished:
# start
# task.cancel() called
# task successfully
# cancelled
```
Зачем ждать отмененную задачу? Чтобы не было мусора и чего-то невыполненого.
### Как защитить задачу от отмены?
asyncio.shield() защищает awaitable объект от отмены
```
async def coro():
print('start')
await asyncio.sleep(2)
print('finished')
async def main():
task = asyncio.create_task(coro())
shielded = asyncio.shield(task)
asyncio.create_task(cancel(shielded))
await asyncio.sleep(5)
assert not task.cancelled()
asyncio.run(main())
# start
# task.cancel() called
# finished
```
Важно помнить, что неубиваемые сопрограммы это может быть опасно.
В: Что происходит с ожидающим защищенной задачи, которую отменили?
О: Ожидающий shieldedзадачу получит CancelledError
### Как ограничить время ожидания awaitable объекта (таймаут)?
- asyncio.wait_for() ожидает выполнения задачи в течение указанного времени, если задача не успевает выполнится - она отменяется и бросается asyncio.TimeoutError
- Отмену задачи можно предотвратить с помощью asyncio.shield()
- Если кто-то отменяет wait_for(), то и обернутый им awaitable обьект тоже отменяется
```
async def eternity():
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('i was cancelled')
raise
print('finished')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout')
asyncio.run(main())
```
### Как подождать выполнения awaitable объектов?
- asyncio.wait() запускает awaitable-объекты и отдает поток управления, пока не будет выполнено условие return_when (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED)
- Оборачивает объекты coroutine в asyncio.Task
- Возвращает два множества: выполненные и выполняющиеся объекты
- Не отменяет задачи в случае FIRST_COMPLETED или FIRST_EXCEPTION
```
async def main():
tasks = [
asyncio.create_task(check_user_exists(i))
for i in range(100)
]
done, pending = await asyncio.wait(tasks, timeout=1)
# done: <class 'set'>: {<Task finished coro=...>}
# pending: <class 'set'>: {<Task finished coro=...>}
print(done, pending)
asyncio.run(main())
```
### Как еще подождать выполнения awaitable объектов?
- asyncio.as_completed() запускает awaitable объекты, возвращает итератор по результатам в порядке выполнения (сначала - самые быстро вычисленные)
- Можно указать параметр timeout, если все awaitable объекты не успеют выполниться - будет брошено исключение asyncio.TimeoutError
### Что сейчас выполняется?
asyncio.current_task() вернет выполняющуюся в данный момент задачу или None
```
import asyncio
async def main():
task = asyncio.current_task()
# <Task pending coro=<main() running at ...
print(task)
asyncio.run(main())
```
asyncio.all_tasks() вернет все незаконченные задачи, запущенные в цикле событий
```
async def main():
coros = (
check_user_exists(i)
for i in range(100)
)
asyncio.gather(*coros)
await asyncio.sleep(0.1)
tasks = asyncio.all_tasks()
print(type(tasks))
print(tasks)
```
## Низкоуровневый API
### Policy (Политика)
- Глобальный объект для каждого процесса, отвечает за выбор, настройку и управление циклом событий
- Определяет понятие контекста и управляет отдельным циклом событий для каждого контекста (по умолчанию контекст - текущий поток)
- По умолчанию используется DefaultEventLoopPolicy, использует SelectorEventLoop на *nix и ProactorEventLoop на Windows
- Есть альтернативные WindowsSelectorEventLoopPolicy и WindowsProactorEventLoopPolicy
- Можно получить текущую с помощью asyncio.get_event_loop_policy()
- Настраивается с помощью asyncio.set_event_loop_policy(policy)
## Цикл событий
- Ядро любого asyncio приложения
- Выполняет асинхронные задачи и функции обратного вызова из очереди, выполняет сетевой I/O, управляет выполнением попроцессов (см. _run_once())
https://github.com/python/cpython/blob/master/Lib/asyncio/base_events.py#L1798
- asyncio предоставляет нам SelectorEventLoop (*nix) и ProactorEventLoop (Windows)
### SelectorEventLoop
- Использует модуль selectors, который включает в себя
› SelectSelector
› PollSelector
› EpollSelector
› DevpollSelector
› KqueueSelector
› DefaultSelector
### Что умеет цикл событий?
- Планировать обратных вызовов
- Открывать сетевые подключения, в т.ч. защищенные (TLS)
- Создавать сетевые серверы
- Эффективно передавать файлы (sendfile)
- Мониторить файловые дескрипторы
- Напрямую работать с объектами socket
- Резолвить DNS (в потоках, потому что “unix плох”)
- Обрабатывать сигналы операционной системы (*nix)
- Выполнять код в пулах потоков или процессов
- Выполнять подпроцессы
### Запуск синхронного кода в процессе/потоке
```
def blocking_io():
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
```
### Альтернативные циклы: uvloop
Есть большая сетевая нагрузка, тогда это вам подходит.
-Реализован поверх libuv, стабильный
- Может дать хороший прирост производительности, если есть очень много сетевого I/O
## Асинхронные интерфейсы
### Асинхронный менеджер контекста
- Способен приостановить выполнение в __aenter__ и __aexit__ методах
- Как и с обычными менеджерами контекста, можно использовать несколько объектов с оператором async with
- PEP 492
```
class TransactionCtx:
def __init__(self, conn):
self.conn = conn
async def __aenter__(self):
await self.conn.execute('BEGIN')
print('entering context')
return self
async def __aexit__(self, exc_type, exc, tb):
command = 'ROLLBACK' if exc else 'COMMIT'
await self.conn.execute(command)
print('exiting context')
```
Как использовать асинхронный менеджер контекста?
```
async def main():
conn = await connect(...)
async with TransactionCtx(conn) as transaction:
...
```
### Асинхронные итераторы
- Можно вызывать асинхронный код
- Итерируемый объект должен реализовывать метод __aiter__
- Итератор должен реализовывать асинхронный метод __anext__
- По завершении метод __anext__ должен бросить исключение StopAsyncIteration
- PEP 492
### Как написать асинхронный итератор?
```
class Ticker:
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
async def main():
async for i in Ticker(1, 10):
print(i)
else:
print('hm?')
asyncio.run(main())
```
### Асинхронные генераторы
- Асинхронная функция, в которой используется yield
- Вместо send и throw - асинхронные asend() и athrow()
- Можно использовать с async for
- Не поддерживают yield from
- PEP 525
```
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def main():
async for i in ticker(1, 10):
print(i)
asyncio.run(main())
```
### Asynchronous comprehensions
В python 3.6+ поддерживаются все compherensions:
- множетсво: {i async for i in agen()}
- список: [i async for i in agen()]
- словарь: {i: i ** 2 async for i in agen()}
- генератор: (i ** 2 async for i in agen())
- Можно сочетать с for и условиями if
- PEP 530
```
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def main():
results = [
(i, j)
async for i in ticker(0.1, 5)
async for j in ticker(0.1, 5)
if not i % 2 and j % 2
]
print(results)
```
## aiohttp
### Что это и зачем?
```
import asyncio
async def handle(reader, writer):
# get client headers
# parse request headers
# route request (call appropriate handler)
# retrieve request body
# write response headers
# write response payload (body)
async def main():
server = await asyncio.start_server(handle, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
```
- Асинхронный HTTP клиент и сервер для asyncio
- Поддерживает HTTP 1.0, 1.1 не поддерживает 2.0
- Поддерживает вебсокеты
- Обработчик должен быть awaitable
- Парсер HTTP написан на С (взят из nodejs)
Пример HTTP сервера
```
from aiohttp import web
from aiohttp.web import run_app
async def handle(request):
return web.Response(text='Meow.')
app = web.Application()
app.router.add_route('GET', '/', handle)
run_app(app, port=8888)
```
Пример HTTP клиента
```
import asyncio
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://0.0.0.0:8888') as resp:
print(resp.status)
print(await resp.text())
asyncio.run(main())
```
Если у вам много запросов, то лучше использовать Сессии.
## Middleware
Промежуточный слой – это механизм “хуков” для обработки запросов и ответов.
```
async def test(request):
print('Handler function called')
return web.Response(text="Hello")
@web.middleware
async def middleware(request, handler):
print('Middleware called')
response = await handler(request)
print('Middleware finished')
return response
app = web.Application(middlewares=[middleware])
app.router.add_get('/', test)
web.run_app(app)
```
Очень мощный инструмент для изменения логики работы обработчиков и их ответов:
- Аутентификация и авторизация
- Обработка ошибок
- Модифицирование ответов
### Payloads
Напишем простое aiohttp приложение
```
async def init_pg(app):
app['pg'] = await asyncpg.create_pool('postgresql://user:hackme@0.0.0.0/test')
async def handle(request):
async with request.app['pg'].acquire() as conn:
row = await conn.fetchrow('SELECT 1 as col')
return aiohttp.web.Response(body={'data': row})
app = aiohttp.web.Application()
app.router.add_route('GET', '/', handle)
app.on_startup.append(init_pg)
aiohttp.web.run_app(app, port=8082)
```
Получим ошибку:
```
aiohttp.payload.LookupError
ValueError: Unsupported body type <class ‘dict'>
```
### Научим aiohttp работать с Mapping-объектами
```
from typing import Mapping
from aiohttp import PAYLOAD_REGISTRY, JsonPayload
PAYLOAD_REGISTRY.register(JsonPayload, Mapping)
```
Получим ошибку:
```
TypeError: Object of type Record is not JSON serializable
```
Что такое JsonPayload?
```
class JsonPayload(BytesPayload):
def __init__(self, value: Any,
encoding: str='utf-8',
content_type: str='application/json',
dumps: JSONEncoder=json.dumps,
*args: Any,
**kwargs: Any) -> None:
super().__init__(dumps(value).encode(encoding),
content_type=content_type, encoding=encoding,*args, **kwargs)
```
### singledispatch for win
```
from datetime import datetime
from functools import singledispatch
from asyncpg import Record
@singledispatch
def convert(value):
raise NotImplementedError(f'Unserializable value: {value!r}')
@convert.register(Record)
def convert_asyncpg_record(value: Record):
return dict(value)
@convert.register(datetime)
def convert_datetime(value: datetime):
return value.isoformat()
```
### Научим aiohttp работать с объектами типа asyncpg.Record и datetime
```
import json
from functools import partialmethod, partial
dumps = partial(json.dumps, default=convert)
class UniversalJsonPayload(JsonPayload):
__init__ = partialmethod(JsonPayload.__init__, dumps=dumps)
PAYLOAD_REGISTRY.register(UniversalJsonPayload, Mapping)