### Что такое стандарт asyncio? - Фундамент для асинхронных фреймворков - Базовые абстракции (Future/Coroutine/Task/AbstractEventLoop) - Высокоуровневый API - Сопрограммы (coroutine, generator coroutine), задачи (Task). Streams, примитивы для синхронизации, Queues. API для работы с процессами и межпроцессного взаимодействия. - Низкоуровневый API. loop.*, asyncio.Future. Транспорты и протоколы\ ### Что такое awaitable-объекты - Можно использовать в выражении await - Использование объекта в выражении await означает, что текущая сопрограмма переключит контекст и будет ожидать, пока выражение не будет выполнено - Существует 3 встроенных типа awaitable объектов: coroutine, Task, Future ### Что такое сопрограмма? Методика связи программных модулей друг с другом по принципу кооперативной многозадачности: модуль приостанавливается в определённой точке, сохраняя полное состояние (включая стек вызовов и счётчик команд), и передаёт управление другому. Тот, в свою очередь, выполняет задачу и передаёт управление обратно, сохраняя свои стек и счётчик. ### Что такое объект coroutine в python? Объект, который имеет ряд инструкций, умеет хранить свое состояние и может переключать контекст (передавать управление) Является более обобщенной формой подпрограмм (интерпретатор может выходить в подпрограмму в 1 точке и выходить в другой. В сопрограммах может быть несколько точек входа, выхода и возврата). Простыми словами - ГЕНЕРАТОР. А в 3.5+ специальный объект который ведет себя как генератор ### Что такое coroutine function? - Функция, возвращающая объект coroutine - Определяется ключевыми словами async def, может содержать ключевые слова await, async for, async with ``` import asyncio # <class 'function'> это coroutine function async def actual_coro(): print('hello') await asyncio.sleep(1) print('world') # <class 'coroutine'> это coroutine object coro = actual_coro() ``` В данном случае контекст переключает `await` ### Как запустить сопрограмму? 1. asyncio.run() ``` import asyncio async def get_text(): return 'hello, world!' async def say_text(): text = await get_text() await asyncio.sleep(1) return text result = asyncio.run(say_text()) print(result) # hello, world! ``` asyncio.run(): запуск event loop ожидания окончания работы асинхронной функции, завершение работы event loop и отмена всех порожденных асинхронных задач await: запуск из асинхронного кода с явным переключением контекста: await 2. asyncio.create_task(): запуск задачи в фоновом режиме ``` async def get_text(delay, text): await asyncio.sleep(delay) return text async def say_text(): task1 = asyncio.create_task(get_text(1, 'hello')) task2 = asyncio.create_task(get_text(1, 'world')) await task1 await task2 return ', '.join([task1.result(), task2.result()]) result = asyncio.run(say_text()) print(result) # hello, world ``` ### Что такое asyncio.Task и зачем он нужен? - Представляет из себя сопрограмму, запущенную или запланированную для запуска в цикле событий и контекст - Позволяет запускать задачи в фоновом режиме - Создается с помощью asyncio.create_task() или loop.create_task(), которые оборачивают сопрограмму в объект Task и планирует ее выполнение в цикле событий на ближайшее время - Абстракция, позволяющая отменить/прервать выполнение сопрограммы с помощью .cancel() ### Что такое asyncio.Future? - Cпециальный низкоуровневый awaitable объект, представляющий конечный результат выполнения асинхронной операции - Позволяет использовать низкоуровневый код, реализованный на функциях обратного вызова с высокоуровневым кодом на async/await - Создается с помощью loop.create_future() ``` import asyncio async def my_sleep(delay): loop = asyncio.get_running_loop() future = loop.create_future() loop.call_later(delay, future.set_result, True) await future async def main(): loop = asyncio.get_running_loop() print(loop.time()) # 0.9779213 await my_sleep(1) print(loop.time()) # 1.979251479 asyncio.run(main()) ``` ## Высокоуровневый API ### Как приостановить задачу на некоторое время? - asyncio.sleep() приостанавливает действие текущей задачи на указанное время, позволяя выполнять другие задачи - Если указан параметр result, его значение возвращается вызывающему объекту по завершении работы сопрограммы ``` import asyncio async def main(): print(await asyncio.sleep(1, 'hello')) asyncio.run(main()) ``` ### Как запустить сразу несколько задач? ``` import aiohttp import asyncio async def check_user_exists(user_id: int) -> bool: async with aiohttp.ClientSession() as session: url = f'https://example.org/users/{user_id}' async with session.head(url) as resp: print(user_id, resp.status == 200) return resp.status == 200 async def main(): coros = ( check_user_exists(i) for i in range(100) ) # <class 'list'>: [False, False, ...] results = await asyncio.gather(*coros) ``` ### asyncio.gather() - asyncio.gather() запускает указанные awaitable объекты в конкурентном режиме и возвращает результаты выполнения в том же порядке - Оборачивает объекты coroutine в asyncio.Task - В случае отмены asyncio.gather() отменяются все запущенные (но еще не завершенные) задачи ### Что будет в случае исключения в одной из задач в gather? ``` async def trigger(position): await asyncio.sleep(position) if position == 3: raise RuntimeError('Boooom!') print('%d is ok' % position) async def russian_roulette(): coros = (trigger(i) for i in range(8)) try: await asyncio.gather(*coros) except RuntimeError as e: print(e) await asyncio.sleep(10) asyncio.run(russian_roulette()) # В данном примере все задачи выполнятся, но 3 задача упадет. ``` - Если параметр return_exceptions=False, первое исключение из задачи, запущенной в gather() будет брошено где запущен await.gather(). Остальные задачи, запущенные в gather(), продолжат выполнение. (значение по умолчанию) - Если return_exceptions=True, исключения возвращаются в списке с результатами - Если ожидаемый объект, переданный в gather() будет отменен - gather() не отменяется и остальные задачи продолжают выполняться ### Как отменить задачу? ``` async def coro(): print('start') await asyncio.sleep(2) print('finished') async def cancel(task): await asyncio.sleep(0.5) task.cancel() print('task.cancel() called') try: await task except asyncio.CancelledError: print('task successfully cancelled') async def main(): task = asyncio.create_task(coro()) asyncio.create_task(cancel(task)) await asyncio.sleep(5) assert task.cancelled() # Задача coro будет прервана и не выведет finished: # start # task.cancel() called # task successfully # cancelled ``` Зачем ждать отмененную задачу? Чтобы не было мусора и чего-то невыполненого. ### Как защитить задачу от отмены? asyncio.shield() защищает awaitable объект от отмены ``` async def coro(): print('start') await asyncio.sleep(2) print('finished') async def main(): task = asyncio.create_task(coro()) shielded = asyncio.shield(task) asyncio.create_task(cancel(shielded)) await asyncio.sleep(5) assert not task.cancelled() asyncio.run(main()) # start # task.cancel() called # finished ``` Важно помнить, что неубиваемые сопрограммы это может быть опасно. В: Что происходит с ожидающим защищенной задачи, которую отменили? О: Ожидающий shieldedзадачу получит CancelledError ### Как ограничить время ожидания awaitable объекта (таймаут)? - asyncio.wait_for() ожидает выполнения задачи в течение указанного времени, если задача не успевает выполнится - она отменяется и бросается asyncio.TimeoutError - Отмену задачи можно предотвратить с помощью asyncio.shield() - Если кто-то отменяет wait_for(), то и обернутый им awaitable обьект тоже отменяется ``` async def eternity(): try: await asyncio.sleep(3600) except asyncio.CancelledError: print('i was cancelled') raise print('finished') async def main(): try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout') asyncio.run(main()) ``` ### Как подождать выполнения awaitable объектов? - asyncio.wait() запускает awaitable-объекты и отдает поток управления, пока не будет выполнено условие return_when (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED) - Оборачивает объекты coroutine в asyncio.Task - Возвращает два множества: выполненные и выполняющиеся объекты - Не отменяет задачи в случае FIRST_COMPLETED или FIRST_EXCEPTION ``` async def main(): tasks = [ asyncio.create_task(check_user_exists(i)) for i in range(100) ] done, pending = await asyncio.wait(tasks, timeout=1) # done: <class 'set'>: {<Task finished coro=...>} # pending: <class 'set'>: {<Task finished coro=...>} print(done, pending) asyncio.run(main()) ``` ### Как еще подождать выполнения awaitable объектов? - asyncio.as_completed() запускает awaitable объекты, возвращает итератор по результатам в порядке выполнения (сначала - самые быстро вычисленные) - Можно указать параметр timeout, если все awaitable объекты не успеют выполниться - будет брошено исключение asyncio.TimeoutError ### Что сейчас выполняется? asyncio.current_task() вернет выполняющуюся в данный момент задачу или None ``` import asyncio async def main(): task = asyncio.current_task() # <Task pending coro=<main() running at ... print(task) asyncio.run(main()) ``` asyncio.all_tasks() вернет все незаконченные задачи, запущенные в цикле событий ``` async def main(): coros = ( check_user_exists(i) for i in range(100) ) asyncio.gather(*coros) await asyncio.sleep(0.1) tasks = asyncio.all_tasks() print(type(tasks)) print(tasks) ``` ## Низкоуровневый API ### Policy (Политика) - Глобальный объект для каждого процесса, отвечает за выбор, настройку и управление циклом событий - Определяет понятие контекста и управляет отдельным циклом событий для каждого контекста (по умолчанию контекст - текущий поток) - По умолчанию используется DefaultEventLoopPolicy, использует SelectorEventLoop на *nix и ProactorEventLoop на Windows - Есть альтернативные WindowsSelectorEventLoopPolicy и WindowsProactorEventLoopPolicy - Можно получить текущую с помощью asyncio.get_event_loop_policy() - Настраивается с помощью asyncio.set_event_loop_policy(policy) ## Цикл событий - Ядро любого asyncio приложения - Выполняет асинхронные задачи и функции обратного вызова из очереди, выполняет сетевой I/O, управляет выполнением попроцессов (см. _run_once()) https://github.com/python/cpython/blob/master/Lib/asyncio/base_events.py#L1798 - asyncio предоставляет нам SelectorEventLoop (*nix) и ProactorEventLoop (Windows) ### SelectorEventLoop - Использует модуль selectors, который включает в себя › SelectSelector › PollSelector › EpollSelector › DevpollSelector › KqueueSelector › DefaultSelector ### Что умеет цикл событий? - Планировать обратных вызовов - Открывать сетевые подключения, в т.ч. защищенные (TLS) - Создавать сетевые серверы - Эффективно передавать файлы (sendfile) - Мониторить файловые дескрипторы - Напрямую работать с объектами socket - Резолвить DNS (в потоках, потому что “unix плох”) - Обрабатывать сигналы операционной системы (*nix) - Выполнять код в пулах потоков или процессов - Выполнять подпроцессы ### Запуск синхронного кода в процессе/потоке ``` def blocking_io(): with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_bound) print('custom process pool', result) asyncio.run(main()) ``` ### Альтернативные циклы: uvloop Есть большая сетевая нагрузка, тогда это вам подходит. -Реализован поверх libuv, стабильный - Может дать хороший прирост производительности, если есть очень много сетевого I/O ## Асинхронные интерфейсы ### Асинхронный менеджер контекста - Способен приостановить выполнение в __aenter__ и __aexit__ методах - Как и с обычными менеджерами контекста, можно использовать несколько объектов с оператором async with - PEP 492 ``` class TransactionCtx: def __init__(self, conn): self.conn = conn async def __aenter__(self): await self.conn.execute('BEGIN') print('entering context') return self async def __aexit__(self, exc_type, exc, tb): command = 'ROLLBACK' if exc else 'COMMIT' await self.conn.execute(command) print('exiting context') ``` Как использовать асинхронный менеджер контекста? ``` async def main(): conn = await connect(...) async with TransactionCtx(conn) as transaction: ... ``` ### Асинхронные итераторы - Можно вызывать асинхронный код - Итерируемый объект должен реализовывать метод __aiter__ - Итератор должен реализовывать асинхронный метод __anext__ - По завершении метод __anext__ должен бросить исключение StopAsyncIteration - PEP 492 ### Как написать асинхронный итератор? ``` class Ticker: def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1, 10): print(i) else: print('hm?') asyncio.run(main()) ``` ### Асинхронные генераторы - Асинхронная функция, в которой используется yield - Вместо send и throw - асинхронные asend() и athrow() - Можно использовать с async for - Не поддерживают yield from - PEP 525 ``` async def ticker(delay, to): for i in range(to): yield i await asyncio.sleep(delay) async def main(): async for i in ticker(1, 10): print(i) asyncio.run(main()) ``` ### Asynchronous comprehensions В python 3.6+ поддерживаются все compherensions: - множетсво: {i async for i in agen()} - список: [i async for i in agen()] - словарь: {i: i ** 2 async for i in agen()} - генератор: (i ** 2 async for i in agen()) - Можно сочетать с for и условиями if - PEP 530 ``` async def ticker(delay, to): for i in range(to): yield i await asyncio.sleep(delay) async def main(): results = [ (i, j) async for i in ticker(0.1, 5) async for j in ticker(0.1, 5) if not i % 2 and j % 2 ] print(results) ``` ## aiohttp ### Что это и зачем? ``` import asyncio async def handle(reader, writer): # get client headers # parse request headers # route request (call appropriate handler) # retrieve request body # write response headers # write response payload (body) async def main(): server = await asyncio.start_server(handle, '127.0.0.1', 8888) async with server: await server.serve_forever() asyncio.run(main()) ``` - Асинхронный HTTP клиент и сервер для asyncio - Поддерживает HTTP 1.0, 1.1 не поддерживает 2.0 - Поддерживает вебсокеты - Обработчик должен быть awaitable - Парсер HTTP написан на С (взят из nodejs) Пример HTTP сервера ``` from aiohttp import web from aiohttp.web import run_app async def handle(request): return web.Response(text='Meow.') app = web.Application() app.router.add_route('GET', '/', handle) run_app(app, port=8888) ``` Пример HTTP клиента ``` import asyncio import aiohttp async def main(): async with aiohttp.ClientSession() as session: async with session.get('http://0.0.0.0:8888') as resp: print(resp.status) print(await resp.text()) asyncio.run(main()) ``` Если у вам много запросов, то лучше использовать Сессии. ## Middleware Промежуточный слой – это механизм “хуков” для обработки запросов и ответов. ``` async def test(request): print('Handler function called') return web.Response(text="Hello") @web.middleware async def middleware(request, handler): print('Middleware called') response = await handler(request) print('Middleware finished') return response app = web.Application(middlewares=[middleware]) app.router.add_get('/', test) web.run_app(app) ``` Очень мощный инструмент для изменения логики работы обработчиков и их ответов: - Аутентификация и авторизация - Обработка ошибок - Модифицирование ответов ### Payloads Напишем простое aiohttp приложение ``` async def init_pg(app): app['pg'] = await asyncpg.create_pool('postgresql://user:hackme@0.0.0.0/test') async def handle(request): async with request.app['pg'].acquire() as conn: row = await conn.fetchrow('SELECT 1 as col') return aiohttp.web.Response(body={'data': row}) app = aiohttp.web.Application() app.router.add_route('GET', '/', handle) app.on_startup.append(init_pg) aiohttp.web.run_app(app, port=8082) ``` Получим ошибку: ``` aiohttp.payload.LookupError ValueError: Unsupported body type <class ‘dict'> ``` ### Научим aiohttp работать с Mapping-объектами ``` from typing import Mapping from aiohttp import PAYLOAD_REGISTRY, JsonPayload PAYLOAD_REGISTRY.register(JsonPayload, Mapping) ``` Получим ошибку: ``` TypeError: Object of type Record is not JSON serializable ``` Что такое JsonPayload? ``` class JsonPayload(BytesPayload): def __init__(self, value: Any, encoding: str='utf-8', content_type: str='application/json', dumps: JSONEncoder=json.dumps, *args: Any, **kwargs: Any) -> None: super().__init__(dumps(value).encode(encoding), content_type=content_type, encoding=encoding,*args, **kwargs) ``` ### singledispatch for win ``` from datetime import datetime from functools import singledispatch from asyncpg import Record @singledispatch def convert(value): raise NotImplementedError(f'Unserializable value: {value!r}') @convert.register(Record) def convert_asyncpg_record(value: Record): return dict(value) @convert.register(datetime) def convert_datetime(value: datetime): return value.isoformat() ``` ### Научим aiohttp работать с объектами типа asyncpg.Record и datetime ``` import json from functools import partialmethod, partial dumps = partial(json.dumps, default=convert) class UniversalJsonPayload(JsonPayload): __init__ = partialmethod(JsonPayload.__init__, dumps=dumps) PAYLOAD_REGISTRY.register(UniversalJsonPayload, Mapping)