## Процессы
Проблемы процессов в высоконагруженных веб-приложениях
- Требуют много памяти (особенно для питона*)
- Неэффективная коммуникация
- Оверхед со стороны ОС (user-space to kernel-space trip, scheduling, context switching)
PS1. В питоне есть счетчик ссылок, который мешает утилизировать CoW(Copy-on-Write). С этим борются, но с переменным успехом
PS2. В реальном мире нет форков на каждый запрос, в основном используется prefork (см. uWSGI, gunicorn)
## Потоки
- Требуют меньше памяти чем процессы
- Более эффективная коммуникация
- Более легковесное переключение*
https://www.quora.com/How-does-thread-switching-differ-from-process-switching-What-is-the-performance-difference/answer/Robert-Love-1
Преимущества многопоточности
1. Простой для понимания код
2. Работает с общей памятью
3. Наличие большого объема знаний и лучших практик, доступных для написания многопоточных приложений
Недостатки:
1. Сложность в исправления багов. Ошибки многопоточности и условия гонки в многопоточных программах — самые сложные виды ошибок для исправления.
2. Потоки ресурсоёмкие. Для создания потоков требуются дополнительные ресурсы операционной системы, например предварительно выделенное пространство стека для каждого потока, которое использует виртуальную память процесса заранее.
3. Многопоточность может повлиять на производительность. При очень высоких уровнях параллелизма (5000 потоков) также может быть влияние на производительность из-за затрат на переключение контекста
4. Потоки негибкие. Операционная система будет постоянно делить процессорное время со всеми потоками независимо от того, готов ли поток выполнять работу или нет. Например, поток может ожидать данных в сокете, но планировщик ОС может по-прежнему переключаться на этот поток и обратно тысячи раз, прежде чем потребуется выполнить какую-либо реальную работу.
Пример работы многопоточности
```
from attr import attrs, attrib
import sys
import threading
from queue import Queue
class ThreadBot(threading.Thread):
def __init__(self):
super().__init__(target=self.manage_table)
self.cutlery = Cutlery(knives=0, forks=0)
self.tasks = Queue()
def manage_table(self):
while True:
task = self.tasks.get()
if task == 'prepare table':
kitchen.give(to=self.cutlery, knives=4, forks=4)
elif task == 'clear table':
self.cutlery.give(to=kitchen, knives=4, forks=4)
elif task == 'shutdown':
return
@attrs
class Cutlery:
knives = attrib(default=0)
forks = attrib(default=0)
def give(self, to: 'Cutlery', knives=0, forks=0):
self.change(-knives, -forks)
to.change(knives, forks)
def change(self, knives, forks):
self.knives += knives
self.forks += forks
kitchen = Cutlery(knives=100, forks=100)
bots = [ThreadBot() for i in range(10)]
for bot in bots:
for i in range(int(sys.argv[1])):
bot.tasks.put('prepare table')
bot.tasks.put('clear table')
bot.tasks.put('shutdown')
print('Kitchen inventory before service:', kitchen)
for bot in bots:
bot.start()
for bot in bots:
bot.join()
print('Kitchen inventory after service:', kitchen)
```
Если мы запустим код 100 раз, то ошибок не будет, а вот если 10000, тогда мы не получим обатрно наши столовые приборы. Ошибка кроется в данной функции
```
def change(self, knives, forks):
self.knives += knives
self.forks += forks
```
Проблема с вытесняющей многозадачностью заключается в том, что любой поток, занимающийся инкрементом может быть прерван в любой момент, и другому потоку может быть предоставлена возможность выполнить её снова.
В этом случае предположим, что ThreadBot A выполняет шаг 1, а затем планировщик ОС приостанавливает A и переключается на ThreadBot B. B также считывает текущее значение self.knives; затем выполнение возвращается к A. A увеличивает свою сумму и записывает ее обратно, но затем B продолжает работу с того места, где она была приостановлена (после шага 1), и увеличивает и записывает свою новую сумму, тем самым стирая изменение, сделанное A!
Эту проблему можно решить, установив блокировку на изменение общего состояния:
```
def change(self, knives, forks):
with self.lock:
self.knives += knives
self.forks += forks
```
### Проблемы потоков в высоконагруженных веб-приложениях на питоне
- Другие проблемы с коммуникацией. У нас общая память и возникают гонки и дедлоки.
- Все еще оверхед со стороны ОС, потому что потоки нужно шедулить.
- Даже если ограничить количество потоков пулами — пулы забиваются, приложение встает
- В питоне еще и GIL(Global Interpreter Lock). Не дает двум потокам интерпретатора выполняться параллельно
## Зачем нам GIL?
- Структуры данных интерпретатора не являются потокобезопасными
- Поэтому их нужно защищать от гонок
### Есть ли жизнь после GIL?
- Наличие GIL не означает, что потоками в питоне нельзя пользоваться
- Более того, некоторые вещи можно сделать только при помощи потоков (работа с ФС, сторонние либы...)
- GIL не является боттлнеком в большинстве приложений (ваша база данных умрет раньше)
### И все-таки...
- GIL вносит дополнительный оврехед
- Про GIL надо знать
- По возможности проводите нагрузочное тестирование и делайте бенчмарки
### Почему задачи ждут завершения предыдущих?
Ответ - Блокирующие системные вызовы. Мы блокируемся на I/O - взаимодейсвтеи с внешним миром. Например, Ввод/вывод, сеть, ФС.
### Что нужно для асинхронного кода
1. Механизм, который предоставит информацию о том, заблокируем ли мы, если захотим прочитать информацию из сокета. Решение - Linux может сообщить о том, что I/O заблокирует поток
```
import os, socket
def server():
sock = create_passive_socket()
while True:
conn, addr = sock.accept()
conn.setblocking(0) # fcntl(fd, F_SETFL, flags | O_NONBLOCK)
while True:
try:
request_data = conn.read(1024)
except socket.error as se: # se.errno == EAGAIN (*)
handle_other_requests()
```
Проблема - Мы не можем тратить по системному вызову на каждый
сокет.
Решение - Поллинг
- Системный вызов select
- В него передается три множества дескрипторов
- Те, из которых я хочу прочитать*
- Те, в которые я хочу написать
-Те, в которых возможно что-то сломалось
- ОС, скажи, где я могу реализовать свое желание и не заблокироваться? С таймаутом, пожалуйста
```
rlist, wlist, elist = select(read_fds, write_fds, exc_fds, timeout)
```
\\ * Под "прочитать" иногда подразумевается странное, вроде ECONNRESET. Читайте ман!
\\ ** На практике более производительным системным вызовом для поллинга является epoll (или kqueue для BSD-like)
Подведем некоторую черту, мы хотим взять на себя ответственность за переключения задач. Почему мы хотим это сделать во первых
потому что мы знаем что задача у нас, они короткие, мы знаем что наши задачи большую часть времени просто ждут и ничего полезного не делают, у нас их много они очень короткие ну то есть как бы интерпретатор работает мало мы большую часть времени просто ждем и мы говорим, зачем мы
обязываем операционную систему заниматься этим - создавать потоки и
процессы, которые являются дорогими абстракциями, если мы это все можем сделать сами. Операционная система нам предоставляет интерфейс, мы можем взять много сокетов, ни один процесс 1 сокет, а один процесс много сокетов и мы их сами обрабатываем, мы сами знаем какую коротенькую задачу выполнить прежде чем этот socket опять отправиться в ожидающий режим, что мы экономим мы вместо того чтобы переключать задачи при помощи операционной системы мы переключаем задачи сами , это дешевле чем тратить ресурсы операционной системы.
### EventLoop
- Имеется поток событий (или сообщений)
- В ответ на событие нужно запустить обработчик
- Обработчики в процессе выполнения могут порождать новые события
Примеры событий: из сокета можно прочитать без блокировки, был принят сигнал, сработал таймер, завершилась какая-то функция
Нужно помнить, что обработчики в процессе выполения события порождают новые события.
### Что делать, если что-то сломалось?
Плохая идея - создавать обработчик(callback) на каждлое событие, это называется callback hell. Так жить нельзя, код нечитаемый и невозможно отследить.
### За счет чего асинхронные приложения могут быть более производительными?
Приложение переносит на себя ответственность за переключение задач.
У нас есть программа в userspace, она не переключается в ядро. У неё нет тяжой структуры данных внутри данных. Мы не платим за то, чем мы не пользуемся. Мы берем на себя роль выполнения задач, вместо ОС.
### Проблемы асинхронного программирования
1. Синхронный и асинхронный код не живут вместе.
Проблема: для некоторых задач отсутствуют асинхронные
библиотеки
Решения:
- Threadpool
- Написать самим
2. Не для всего есть асинхронные интерфейсы
Проблема: в Linux до сих пор не изобретен асинхронный
интерфейс для работы с файловой системой
Решения:
- Threadpool
3. CPU-bound задачи
Проблема: event-loop должен постоянно вращаться, если кто-то его заблокирует — встанет всё приложение
Например - вычисление hash суммы на процессоре.
```
hash = calculate_sha(war_and_peace)
```
Решения:
- Process pool
- Отдельный микросервис
4. Иногда RPS ограничен не веб-сервером
Проблема: если ваше приложение в 99% случаев ходит в базу данных, ваша производительность равна производительности БД (сервер держит 99999 RPS, база держит 10 QPS)
Решения:
- Думайте над архитектурой
### Когда использовать асинхронщину
- Микросервисы (I/O-bound, не CPU-bound)
- Долгоживущие соединения (websocket, раздача файлов)
- Есть производительная инфраструктура (шардированные базы, кеши, write-heavy очереди)
- Экономия ресурсов серверов
### Когда не использовать асинхронщину
- CPU-bound
- Боттлнек в инфраструктуре (один инстанс базы)
- Вы очень богатые и можете себе позволить сколько угодно железа
## Особенности asyncio в Python
Сценарий работы асинхоонного приложения
1. Запускаем event loop
2. Вызовываем async/await функции
3. Создаем задачи для запуска в цикле
4. Ожидаем завершения задач
5. Останавливаем цикл после завершения всех задач
Например :
```
import asyncio, time
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1)
print(f'{time.ctime()} Goodbye!')
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
pending = asyncio.all_tasks(loop=loop)
for task in pending:
task.cancel()
group = asyncio.gather(*pending, return_exceptions=True)
loop.close()
```
Заметки по коду
1. loop = asyncio.get_event_loop() - нам нужен экземпляр цикла, прежде чем мы сможем запускать какие-либо сопрограммы.
2. task = loop.create_task(coro) - сопрограммы не будет выполняться, пока мы этого не сделаем. create_task() планирует выполнение сопрограммы в цикле. Возвращенный объект задачи можно использовать для отслеживания состояния задачи (например, выполняется ли она еще или завершена), а также может использоваться чтобы получить значение результата из завершенной сопрограммы. Можно отменить задачу с помощью task.cancel().
3. loop.run_until_complete(coro). Этот вызов заблокирует текущий поток, который обычно является основным потоком.run_until_complete() будет поддерживать выполнение цикла только до тех пор, пока не завершится корутина, но все другие задачи, запланированные в цикле, также будут выполняться во время выполнения цикла. Внутри asyncio.run() вызывает run_until_complete() блокирует основной поток.
4. group = asyncio.gather(task1, task2, task3) - Когда main разблокируется либо из-за полученного сигнала процесса, либо из-за остановки цикла каким-либо кодом, вызывающим loop.stop(), запустится код после run_until_complete(). Идея показанная здесь, состоит в том, чтобы собрать все еще ожидающие выполнения задачи, отменить их, а затем снова использовать функцию loop.run_until_complete(), пока эти задачи не будут выполнены. Gather() — это метод сбора. Обратите внимание, что asyncio.run() выполнит всё - отмену, сбор и ожидание для ожидающих завершения задач.
5. loop.close() - нужно вызвать для остановки цикла, очистки всех очередей и остановки исполнения.
Дополнительно
5.1 await loop.run_in_executor(None, func) метод используется, когда нужно запустить что-то в отдельном потоке или даже в отдельном процессе. Здесь мы передаем блокирующую функцию для запуска в исполнителе по умолчанию. Обратите внимание, что run_in_executor() не блокирует основной поток: он только планирует запуск задачи исполнителя (он возвращает Future, что означает, что вы можете ожидать его, если метод вызывается внутри другой функции сопрограммы). Задача исполнителя начнет выполняться только после вызова run_until_complete(), что позволяет циклу обработки событий начать обработку событий.
### Уровни абстракций в asyncio
|Уровень|Концепт|Реализация|
|---|---|---|
|9|Network: streams|StreamReader, StreamWriter, asyncio.open_connection(), asyncio.start_server()|
|8|Network: TCP & UDP | Protocol|
|7| Network: transports|BaseTransport |
|6| Tools|asyncio.Queue |
|5|Subprocesses & threads |run_in_executor(), asyncio.subprocess |
|4|Tasks |asyncio.Task, asyncio.create_task() |
|3| Futures|asyncio.Future |
|2|Event loop |asyncio.run(), BaseEventLoop |
|1|Coroutines |async def, async with, async for, await |
1. Корутины - самый нижний уровень асинхронного кода (есть несколько фреймворков на корутинах - Curio и Trio). Данный уровень нужен для понимания как писать асинхронные функции и использовать await для вызова и выполнения других сопрограмм.
2. EventLoop. Сопрограммы сами по себе бесполезны: они ничего не сделают без EventLoop, он их запускает. asyncio предоставляет как спецификацию цикла AbstractEventLoop, так и реализацию BaseEventLoop. Есть библиотека uvloop, которая обеспечивает гораздо более быструю реализацию цикла, чем в стандартной библиотеки asyncio. Уровень нужен для понимания как запускать, останавливать и взаимодействовать с EventLoop.
3. Уровни 3 и 4 содержат Futures и Tasks, которые очень тесно связаны межу собой; Task является подклассом Future. Экземпляр Future представляет собой текущее действие, которое вернет результат через уведомление в EventLoop, а Task представляет собой Coroutine, работающую в EventLoop. Кратко: future «знает о EventLoop», в то время как task одновременно «знает о EventLoop» и «знает о Coroutine».
5. Исполнитель. Представляет средства для запуска и ожидания функций, которые должны выполняться в отдельном потоке или даже в отдельном процессе. Исполнители нужны для использования блокирующего кода в асинхронном приложении, и реальность такова, что большинство сторонних библиотек еще не асинхронно-совместимы.
6. Уровень 6 представляет дополнительные инструменты, поддерживающие асинхронность, такие как asyncio.Queue. Вы не можете использовать queue.Queue непосредственно внутри сопрограмм, потому что его get() заблокирует основной поток. Если нужно передать данные одной или нескольким долго работающим корутинам, лучший способ сделать это — использовать asyncio.Queue.
7. Наконец, у нас есть уровни сетевого ввода-вывода, с 7 по 9. streams API предоставляет самый простой способ управления сокетами по сети.
### Coroutines
Самое простое объявление корутины, выглядит как обычная функция, но начинается с async
```
>>> async def f():
... return123
...
>>> type(f)
<class 'function'>
>>> import inspect
>>> inspect.iscoroutinefunction(f)
True
```
Сюрприз! тип f — это не «сопрограмма», а обычная функция. Хотя принято их называть корутинами.
```
>>> coro = f()
>>> type(coro)
<class 'coroutine'>
>>> inspect.iscoroutine(coro) True
```
Сопрограмма — это объект, который инкапсулирует возможность возобновления базовой функции, которая была приостановлена до её завершения. Если это звучит знакомо, то это потому, что сопрограммы очень похожи на генераторы.
Внутреннее устройство корутины: использование send() и StopIteration
```
async def f():
return 123
coro = f()
try:
coro.send(None)
except StopIteration as e:
print('The answer was:', e.value) ...
The answer was: 123
```
Сопрограмма инициируется "отправкой" None. Внутри это то, что цикл событий будет делать с сопрограммами, вам никогда не придется делать это вручную. Все созданные вами сопрограммы будут выполняться либо с помощью loop.create_task(coro), либо с помощью await coro. Цикл выполняет .send(None) за кулисами.
### Новое ключевое слово await
await всегда принимает параметр и будет принимать только то, что называется awaitable, которое может быть только:
• Сопрограммой (т. е. результат async def function).
• Любым объектом, реализующим специальный метод __await__(). Этот метод
должен возвращать итератор.
```
async def f():
await asyncio.sleep(1.0) return 123
async def main():
result = await f()
return result
```
Прежде чем перейти к циклу событий, полезно знаит, как сопрограммы могут получать исключения. Это чаще всего используется для отмены: когда вы вызываете task.cancel(), цикл событий будет использовать coro.throw() для вызова asyncio.CancelledError внутри вашей сопрограммы.
```
coro = f()
coro.send(None)
coro.throw(Exception, 'blah')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in f Exception: blah
blah
```
Чтобы понять, что такое отмена задачи — это не что иное, как постоянное создание исключения (и его обработка)
```
async def f():
try:
while True:
await asyncio.sleep
except asyncio.CancelledError:
print('Nope!')
while True: await asyncio.sleep(0)
else:
return 111
>>> coro = f()
>>> coro.send(None)
>>> coro.throw(asyncio.CancelledError)
Nope!
```
Для упраления исключениями и утсранения ручной работы был создан EventLoop
```
async def f():
await asyncio.sleep(0)
return 111
loop = asyncio.get_event_loop()
coro = f()
loop.run_until_complete(coro)
>> 111
```
loop.run_until_complete(coro) выполняет все вызовы метода .send(None) за нас и находит завершение сопрограммы с исключением StopIteration, которое также содержит наше возвращаемое значение.
### Event Loop
Цикл событий в asyncio обрабатывает все переключения между сопрограммами, а также перехватывает исключения StopIteration и многое другое, например прослушивание сокетов и файловых дескрипторов для событий.
Можно обойтись без прямой работы с Event Loop: код asyncio может быть полностью написан с использованием вызовов await, инициированных вызовом asyncio.run(coro). Однако иногда может потребоваться некоторая степень взаимодействия с самим Event Loop, и здесь мы обсудим, как это сделать
Лучше так
```
asyncio.get_running_loop(), callable from inside the context of a coroutine
```
была введена c Python 3.8.
Чем так
```
asyncio.get_event_loop(), callable from anywhere.
```
Если мы находимся внутри сопрограммы и нам нужен доступ к экземпляру цикла, тогда вызываем get_running_loop() для его получения.
get_event_loop() и get_running_loop() работают одинаково, почему они оба существуют? Метод get_event_loop() работает только внутри одного потока. get_event_loop() завершится ошибкой, если будет вызвана внутри нового потока, если создать новый цикл с помощью new_event_loop() и не установить этот новый экземпляр как цикл для этого потока, вызвав set_event_loop().
Напротив, get_running_loop() (рекомендуемый метод) всегда будет делать то, что вы ожидаете: поскольку его можно вызвать только в контексте сопрограммы, задачи или функции, вызываемой из одного из них, он всегда обеспечивает текущий цикл выполнения событий, что почти всегда то, что вы хотите.
Появление get_running_loop() упростило создание фоновых задач. Например, функция сопрограммы, внутри которой создаются и не ожидаются дополнительные задачи.
```
async def f():
# Create some tasks!
loop = asyncio.get_event_loop()
for i in range():
loop.create_task(<some other coro>)
```
В этом примере намерение состоит в том, чтобы запустить новые задачи внутри сопрограммы. Не ожидая их, мы гарантируем, что они будут выполняться независимо от контекста выполнения внутри функции сопрограммы f(). Фактически, f() завершит работу до завершения задач, которые он запустил.
До Python 3.7 для планирования задачи необходимо было сначала получить экземпляр цикла, но с появлением get_running_loop() появились другие функции asyncio, которые его используют, такие как asyncio.create_task(). Начиная с Python 3.7, код для создания асинхронной задачи теперь выглядит
```
import asyncio
async def f():
# Create some tasks!
for i in range():
asyncio.create_task(<some other coro>)
```
### Tasks and Futures
Отличие Tasks от Future
Сопрограмма (coroutine) — результат вызова асинхронной функции, представляющий собой выполнение этой функции, способное приостанавливаться. Так как в общем случае невозможно определить сколько раз и на какое время выполнение будет приостановлено, невозможно и сказать когда оно будет закончено. Ваш код может либо ждать завершения выполнения сопрограммы с помощью оператора await, либо поручить ожидание циклу событий и продолжить свой выполнение.
В первом случае
```
async def callee():
print('Hello')
async def caller():
await callee()
print('World')
```
выполнение caller приостановится до выполнения callee. В этот момент какие-то другие операции в каких-то других сопрограммах могут продолжаться, но caller будет ждать там, где выполнил await.
Во втором случае
```
async def callee():
print('Hello')
async def caller():
asyncio.create_task(callee())
print('World')
```
caller сразу же продолжит свою работу. Строка "World" будет выведена раньше, чем "Hello". Здесь мы видим, что caller поставил циклу событий задачу выполнить сопрограмму callee.
Но что если, callee будет возвращать какое-то значение, которое нужно вызывающей стороне, но не прямо сейчас, а когда будет готово? Вот тут-то на сцену выходят футуры.
Футура (Future) - будущий результат выполнения сопрограммы. Метод ensure_future поручает циклу событий выполнить сопрограмму и сразу же, в момент вызова, возвращает футуру, в которой будет значение, но неизвестно когда. Вызывающая сторона может подождать выполнения футуры так же, как ожидало саму сопрограмму
```
async def callee():
return 'Hello'
async def caller():
loop = asyncio.get_running_loop()
future = loop.ensure_future(callee())
result = await future
print(result + ' World')
```
Или может заняться своими делами, периодически проверяя готовность
```
async def caller():
loop = asyncio.get_event_loop()
future = loop.ensure_future(callee())
while not future.done():
# Какие-нибудь циклические дела
print(future.result() + ' World')
```
Или установить на футуру колбэк
```
async def caller():
loop = asyncio.get_event_loop()
future = loop.ensure_future(callee())
future.add_done_callback(lambda f: print(f.result() + ' World'))
# какие-нибудь другие важные дела
```
Или может собрать их в список и ждать все. Или не все, а только ту, которая будет выполнена первой, а остальные проигнорировать. Или передать футуру другой сопрограмме, а самой заняться каким-нибудь другим делом. В общем, это "очень полезный горшок, куда можно класть какие хочешь вещи".
Подытоживая: Task - это задача, поставленная циклу событий, на выполнение coroutine, одновременно являющаяся Future, которая представляет собой результат выполнения Task когда-нибудь в будущем.
У футур есть метод проверки - статус завершения
```
>>> from asyncio import Future
>>> f = Future()
>>> f.done()
False
```
Future может также выполнить следующее:
1. Иметь "result" (используется .set_result(значение) для его установки и .result() для его получения)
2. Может быть отменен с помощью .cancel() (проверка что отменен с помощью .cancelled())
3. Иметь callback функцию, которая будут запущена, когда future завершится
```
import asyncio
async def main(f: asyncio.Future):
await asyncio.sleep(1)
f.set_result('I have finished.')
loop = asyncio.get_event_loop()
fut = asyncio.Future()
print(fut.done())
print(loop.create_task(main(fut)))
print(loop.run_until_complete(fut))
print(fut.done())
print(fut.result())
```
Интересный момент
```
fut = asyncio.Future()
```
Вручную создается будущий экземпляр Future . Обратите внимание, что этот экземпляр (по умолчанию) привязан к нашему циклу, но он не привязан и не будет привязан ни к одной сопрограмме (для этого и предназначены задачи).
```
loop.create_task(main(fut))
```
Помните, что все, что делает сопрограмма main() - это переходит в спящий режим, а затем переключает экземпляр Future . (Обратите внимание, что сопрограмма main() еще не запустится: сопрограммы запускаются только во время выполнения цикла.)
```
loop.run_until_complete(fut)
```
Здесь мы используем функцию run_until_complete() для экземпляра Future, а не для экземпляра задачи. Теперь, когда цикл запущен, сопрограмма main() начнет выполняться.
В документации Python всё очень сложно описпано, вот простое описание
```
asyncio.ensure_future(coro_or_future, *, _loop=None)
```
• Если вы передадите функции сопрограмму, она создаст экземпляр Задачи (и ваша сопрограмма будет запланирована для запуска в цикле событий). Это идентично вызову asyncio.create_task() (или loop.create_task()) и возврату нового экземпляра задачи.
• Если вы передадите экземпляр Future (или экземпляр Task, поскольку Task является подклассом Future), вы получите то же самое, что и раньше, без изменений.
Например:
```
import asyncio
async def f():
pass
coro = f()
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
assert isinstance(task, asyncio.Task)
new_task = asyncio.ensure_future(coro)
assert isinstance(new_task, asyncio.Task)
mystery_one = asyncio.ensure_future(task)
assert mystery_one is task
```
Если я переименую функцию listify() в ensure_list(), то вы должны начать видеть параллель с asyncio.ensure_future(): он всегда пытается принудить аргумент к Future (или подклассу).
```
asyncio.gather(*aws, loop=None, ...)
```
Параметр aws означает "awaitable objects", которые включают сопрограммы, задачи и futures. Внутренне gather() использует ensure_future() для принуждения к типу: задачи и futures остаются нетронутыми, в то время как задачи создаются для сопрограмм.
### Async Context Managers: async with
Асинхронный менеджер контекста - это менеджер контекста, который может приостановить выполнение в своих методах __aenter__() и __aexit__().
async with — определяет, что при входе в контекстный блок и выходе из него может быть переключение выполнения с текущей сопрограммы. Так же, как и в случае с асинхронным генератором, вместо магических методов: __enter__ и __exit__ следует использовать функционально аналогичные __aenter__ и __aexit__.
```
class Connection:
def __init__(self, host, port):
self.host = host
self.port = port
async def __aenter__(self):
self.conn = await get_conn(self.host, self.port)
return conn
async def __aexit__(self, exec_type, exc, tb):
await self.conn.close()
async with Connection('localhost', 9001) as conn:
pass
```
Неблокирующий contextlib
```
from contexlib import asynccontextmanager
@asynccontextmanager
async def web_page(url):
data = await download_webpage(url)
yield data
await update_stats(url)
async with web_page('google.com') as data:
process(data)
```
Встречаются ситуации, когда нужно использовать блокирующую функцию в своей программе, но изменить код в этой функции невозможно.
Такая ситуация обычно возникает со сторонними библиотеками, и отличным примером является библиотека requests, которая повсюду использует блокировку вызовов.
```
from contexlib import asynccontextmanager
@asynccontextmanager
async def web_page(url):
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(
None, download_webpage, url)
yield data
await loop.run_in_executor(None, update_stats, url)
async with web_page('google.com') as data:
process(data)
```
Для примера предположим, что мы не можем изменить код двух блокирующих вызовов, download_webpage() и update_stats(); т.е. Мы не можем изменить их, чтобы они были сопрограмами. Это плохо, потому что самым тяжким грехом программирования на основе событий является нарушение правила, согласно которому вы никогда, ни при каких обстоятельствах не должны препятствовать обработке событий циклом событий. Чтобы обойти проблему, мы будем использовать исполнителя для выполнения блокирующих вызовов в отдельном потоке. Исполнитель становится доступным для нас как атрибут самого цикла событий.
Если вы хотите использовать исполнителя по умолчанию (который является
ThreadPoolExecutor), вы должны передать None в качестве значения для аргумента исполнителя.
### Async Iterators: async for
Cтандартный (не асинхронный) итератор определяется с помощью методов __iter__() и __next__()
Для создания асинхронного итератора требуется несколько вещей:
1.Надо реализовать def __alter__(). (Примечание: не с async def!)
2. __aiter__() должен возвращать объект, реализующий async def __anext__().
3. __anext__() должен возвращать значение для каждой итерации и вызывать StopAsyncIteration по завершении.
Асинхронный итератор для извлечения данных из Redis
```
import asyncio
from typing import Any
from aioredis import create_redis
class OneAtATime:
def __init__(self, redis, keys):
self.redis = redis
self.keys = keys
def __aiter__(self):
self.ikeys = iter(self.keys)
return self
async def __anext__(self):
try:
k = next(self.ikeys)
except StopIteration:
raise StopAsyncIteration
value = await self.redis.get(k)
return value
async def main():
redis = await create_redis(('localhost', 6379))
keys = ['Americas', 'Africa', 'Europe', 'Asia']
async for value in OneAtATime(redis, keys):
await do_something_with(value)
async def do_something_with(stuff: Any):
print(stuff)
if __name__ == "__main__":
asyncio.run(main())
```
### Более простой код с асинхронными генераторами
• Сопрограммы и генераторы - это совершенно разные понятия.
• Асинхронные генераторы ведут себя так же, как обычные генераторы.
• Для итерации вы используете async для асинхронных генераторов вместо обычных генераторов.
```
import asyncio
from aioredis import create_redis
async def main():
redis = await create_redis(("localhost", 6479))
keys = ["america", "africa", "europe", "asia"]
async for value in one_at_a_time(redis, keys):
await do_something_with(value)
async def one_at_a_time(redis, keys):
for k in keys:
value = await redis.get(k)
yield value
asyncio.run(main())
```
### Async Comprehensions
```
import asyncio
async def doubler(n):
for i in range(n):
yield i, i ** 2
await asyncio.sleep(0.2)
async def main():
result = [x async for x in doubler(3)]
print(result)
result = {x: y async for x, y in doubler(3)}
print(result)
result = {x async for x in doubler(3)}
print(result)
asyncio.run(main())
```
```
import asyncio
async def f(x):
await asyncio.sleep(0.1)
return x + 100
async def factory(n):
for x in range(n):
await asyncio.sleep(0.1)
yield f, x
async def main():
results = [await f(x) async for f, x in factory(3)]
print(f'result = {results}')
asyncio.run(main())
```
### Запуск и остановка программ
Когда функция async def main() завершается, выполняются следующие действия:
1. Собераются все еще не завершенные объекты задачи (если таковые имеются).
2. Отменяем эти задачи (это вызовет ошибку CancelledError внутри каждой запущенной сопрограммы, которую вы можете решить обработкой try/except в теле функции сопрограммы.
3. Собераем все эти задачи в групповое задачу.
4. Используется функция run_until_complete() для групповой задачи, чтобы дождаться ее завершения, т.е. разрешить исключение CancelledError и обработать его.
asyncio.run() выполняет эти действия за вас, но, несмотря на эту помощь, при создании ваших первых нескольких нетривиальных приложений asyncio будет предпринята попытка избавиться от сообщений об ошибках, таких как “Задача была уничтожена, но она находится на рассмотрении!” во время завершения работы. Это происходит потому, что ваше приложение не ожидало одного или нескольких предыдущих шагов.
```
import asyncio
async def f(delay):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))
t2 = loop.create_task(f(2))
loop.run_until_complete(t1)
loop.close()
```
При запуске мы получим следующее сообщение
```
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<f() running at C:\Users\nickd\GDriveSyncFolder\pending.py:4> wait_for=<Future pending cb=[Task.task_wakeup()]>>
```
Эта ошибка говорит о том, что некоторые задачи еще не были завершены, когда цикл был завершен.
Мы хотим избежать этого, и именно поэтому идея завершения работы состоит в том, чтобы собрать все незавершенные задачи, отменить их, а затем позволить им всем завершиться перед закрытием цикла. asyncio.run() выполняет все эти шаги за нас, но важно детально понять процесс.
```
import asyncio
from asyncio import StreamReader, StreamWriter
async def echo(reader: StreamReader, writer: StreamWriter):
print('New connection.')
try:
while data := await reader.readline():
writer.write(data.upper())
await writer.drain()
print('Leaving Connection.')
except asyncio.CancelledError:
print('Connection dropped!')
async def main(host='127.0.0.1', port=8900):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')
```
Cопрограммf echo() будет использоваться (сервером) для создания сопрограммы для каждого нового соединения. Функция использует streams API для взаимодействия с asyncio.
Теперь давайте представим, что это реальное приложение, и мы хотим отправлять все события о сброшенных соединениях в службу мониторинга.
```
import asyncio
from asyncio import StreamReader, StreamWriter
# Pretend that this coroutine actually contacts an external server to
# submit event notifications.
async def send_event(msg: str):
await asyncio.sleep(1)
async def echo(reader: StreamReader, writer: StreamWriter):
print('New Connection.')
try:
while data := await reader.readline():
writer.write(data.upper())
await writer.drain()
print('Leaving connection.')
except asyncio.CancelledError:
msg = 'Connection dropped'
print(msg)
# Because the event notifier involves network access, it is common for
# such calls to be made in a separate async task; that’s why we’re
# using the create_task() function here.
asyncio.create_task(send_event(msg))
async def main(host='127.0.0.1', port=8888):
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Bye!')
```
### Для чего в gather() используется значение return_exceptions=True?
1. функция run_until_complete() управляет футурой; во время завершения работы футура возвращаемается gather().
2. Если в этой футуре возникнет исключение, исключение также будет вызвано из функции run_until_complete(), в этом случае цикл остановится.
3. Если функция run_until_complete() используется группой футур, любое исключение, возникающее внутри любой из подзадач, также будет возникать в группе, если оно не обрабатывается в подзадаче. Обратите внимание, что это включает CancelledError
4. Если только некоторые задачи обрабатывают CancelledError, а другие нет, то те, которые этого не делают, приведут к остановке цикла. Это означает, что цикл будет остановлен до того, как все задачи будут выполнены.
5 Мы не хотим такого поведения. Нам нужно, чтобы функция run_until_complete() завершалась только после завершения всех задач в группе, независимо от того, вызывают ли некоторые задачи исключения.
6. Следовательно, у нас есть gather(*, return_exceptions=True): этот параметр заставляет “групповое” будущее обрабатывать исключения из подзадач как возвращаемые значения, чтобы они не всплывали и не мешали run_until_complete().
```
import asyncio
async def f(delay):
await asyncio.sleep(1 / delay)
return delay
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(f(i))
pending = asyncio.all_tasks(loop=loop)
print(type(pending)) # execute pending tasks
# with return_exceptions=True,exception is returns as a value
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f"{results=}")
loop.close()
```
### Сигналы
Обработка сигналов в asyncio - важный момент, так как если этого не делать, то мы получим вечный цикл.
```
import asyncio
async def main():
while True:
print("<your app is running>")
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print("got signal SIGINT shutting down")
tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
print(group)
loop.run_until_complete(group)
loop.close()
```
```
#! /usr/bin/env python
import asyncio
from signal import SIGINT, SIGTERM
# Example demonstrates how to handle both SIGINT/SIGTERM
# and perform clean shutdown, while also doing cleanup
# in the CancelledError coro termination case.
async def main():
try:
while True:
print('<Your app is running>')
await asyncio.sleep(1)
except asyncio.CancelledError:
# pretend we're doing long-running cleanup
for _ in range(3):
print('<Your app is shutting down...>')
await asyncio.sleep(1)
def handler(sig):
loop = asyncio.get_running_loop()
# This is only valid if we're not relying on
# asyncio.run - otherwise we'll get errors,
# that a loop was terminated before tasks were finished.
loop.stop()
print(f"Got signal: {sig!s}, shutting down.")
# Stop handling signals - the shutdown/cleanup procedure
# is ongoing - avoid interrupting it.
loop.remove_signal_handler(SIGTERM)
# Removing the SIGINT handler would result in
# the default SIGINT handler taking over - the default
# handler throws KeyboardInterrupt exception.
# Instead register a do-nothing handler for SIGINT.
loop.add_signal_handler(SIGINT, lambda: print("<Got SIGINT; Ignoring>"))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
for sig in (SIGTERM, SIGINT):
loop.add_signal_handler(sig, handler, sig)
loop.create_task(main())
loop.run_forever()
# cleaup
tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
```
### Ожидание Исполнителя во время выключения
Что происходит во время завершения работы, когда выполнение заданий исполнителя занимает больше времени, чем выполнение всех ожидающих Task экземпляров. Короткий ответ таков: без вмешательства вы получите ошибки как в примере ниже
```
import asyncio
import time
async def main():
loop = asyncio.get_running_loop()
loop.run_in_executor(None, blocking)
print(f'{time.ctime()} Hello')
await asyncio.sleep(1.0)
print(f'{time.ctime()} GoodBye!')
def blocking():
time.sleep(1.5)
print(f'{time.ctime()} Hello from a Thread!')
asyncio.run(main())
```
Если во время испонения нажать Ctrl+C, то мы получим ошибку RuntimeError: Event loop is closed. Почему?
run_in_executor() не создает экземпляр Task: он возвращает future.
Это означает, что он не входит в набор “активных задач”, которые отменяются внутри asyncio.run(), и поэтому run_until_complete() (вызывается внутри asyncio.run()) не ожидает завершения задачи исполнителя. Ошибка RuntimeError возникает из внутреннего вызова loop.close(), выполняемого внутри asyncio.run().
В Python 3.9 функция asyncio.run() была улучшена для корректного ожидания завершения работы исполнителя.
```
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor
async def main():
print(f'{time.ctime()} Hello!')
await asyncio.sleep(1.0)
print(f'{time.ctime()} Goodbye!')
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
executor = Executor()
loop.set_default_executor(executor)
loop.create_task(main())
future = loop.run_in_executor(None, blocking)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Cancelled')
tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
executor.shutdown(wait=True)
loop.close()
```
Наконец, у нас есть стратегия с общей применимостью: вы можете вызвать run_in_executor() в любом месте, и ваша программа все равно завершит работу, даже если задания исполнителя все еще выполняются после завершения всех асинхронных задач.