# Проблемы
## Управление фоновыми задачами
### Запуск и остановка Event Loop
```
import asyncio, time
async def handler(reader, writer):
writer.write(f'{time.time()}\n\r'.encode())
writer.close()
async def server():
server = await asyncio.start_server(handler, '::1', 2023)
await server.serve_forever()
async def main():
await server()
asyncio.run(main())
```
### Управление задачами
```
async def cache(interval):
""" Start updateing response cache """
async def server():
""" Start TCP server """
async def statistic():
""" Start sending usage statistic """
async def log_sender():
""" Start sending internal logs to LOG collector """
async def main():
await asyncio.gather(cache(), server(), statistic(), log_sender())
asyncio.run(main())
```
### Прототипы задач
```
async def cache(interval):
""" Start updateing response cache """
while True:
await update_global_cache()
await asyncio.sleep(interval)
async def server():
""" Start TCP server """
await passing_config_to_handler()
await create_server()
async def statistic():
""" Start sending usage statistic """
await configuring_sender()
await start_sending()
async def log_sender():
""" Start sending internal logs to LOG collector """
await configuring_sender()
await start_sending()
async def main():
await asyncio.gather(
cache(),
server(),
statistic(),
log_sender()
)
asyncio.run(main())
```
### Решение с wait
```
async def main():
done, pending = await asyncio.wait(
[
cache(),
server(),
statistic(),
log_sender(),
],
return_when=asyncio.FIRST_EXCEPTION
)
...
# Если одна задача упала, нужно отменить все остальные
asyncio.run(main())
```
### Проблема 2. Stdout блокируется
### Проблема 3. Retry policy
- Сеть работает довольно сложно, и иногда маршрутизаторы сбрасывают соединения/перезагружаются/и т.д.
- TCP соединения разрываются - это норма
Варианты решения:
1. Умный клиент. Умный клиент делает retry
2. Умный прокси. Reverse proxy делает retry
3. Умный сервис. Молча пробуем еще раз
#### Чего хочется на практике?
- Клиент не готов ждать выполнения запроса вечно.
- Обычно он не готов ждать больше секунды (ну пяти в крайнем случае)
- Требуется “вести себя прилично” и не “завалить” удаленный сервер запросами. (мы же асинхронные, и можем их много и часто запрашивать)
- Нужно понимать какие именно исключения нужно бросать сразу. Например в ответ на ошибку доступа нет смысла пытаться сделать повторный запрос.
### Блокирующие операции
ThreadPool/ProcessPool
- Блокирующие операции нужно выполнять отдельно от EventLoop, в пуле
потоков или процессов
- Чаще всего блокирующие операции отпускают GIL, и треды не ждут друг-друга, но нужно это отдельно исследовать
- Задачи которые делают много вычислений (особенно в циклах), можно вызывать в ProcessPool или использовать что-то что выполнит эту задачу оптимально (Numpy/Numba/Cython)
```
import asyncio
from concurrent.futures import ThreadPoolExecutor
loop = asyncio.new_event_loop()
loop.set_default_executor(ThreadPoolExecutor(8))
def reader(fname):
loop = asyncio.get_running_loop()
def read_file():
with open(fname, "rb") as fp:
return fp.read()
return loop.run_in_executor(None, read_file)
async def main():
print(await reader("/etc/bashrc"))
loop.run_until_complete(main())
```
## aiomisc - это библиотека с различными утилитами для asyncio
Разработала команда проекта Едадил, чтобы убрать повторяющийся код.
### Основные концепции
entrypoint - это абстаркция, как asyncio.run, но более явно.
** Запуск и остановка EventLoop **
```
import asyncio
import aiomisc
async def main():
await asyncio.sleep(1)
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main())
```
```
import asyncio, time, aiomisc
async def handler(reader, writer):
writer.write(f'{time.time()}\n\r'.encode())
writer.close()
async def server():
server = await asyncio.start_server(handler, '::1', 2023)
async def main():
await server()
with aiomisc.entrypoint() as loop:
loop.create_task(main())
loop.run_forever()
```
Что произошло?
Создание и запуск event loop
- создает default thread pool
- подключаем uvloop (если установлен)
- инициализируем thread-pool
- отдаем объект в контекст
- выполняем пользовательский код в контексте
- ожидание всех блоков finally и закрытие экземпляра event loop
### Service
Это базовый клас. У которого есть два основых метода - Старт и стоп.
```
import asyncio
from aiomisc import entrypoint, Service
class MyService(Service):
async def start(self):
self.task = self.loop.create_task(asyncio.sleep(3600, loop=self.loop))
async def stop(self):
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
```
Как запустить сервис?
```
with entrypoint(MyService()) as loop:
loop.run_forever()
```
Как запустить сервисы?
```
import asyncio
from aiomisc import entrypoint, Service
class MyService(Service):
async def start(self):
self.start_event.set()
await asyncio.sleep(3600, loop=self.loop)
s1 = MyService()
s2 = MyService()
with entrypoint(s1, s2, log_level='info') as loop:
loop.run_forever()
```
### Базовые классы сервисов
- TCPServer
- TLSServer
- UDPServer
- PeriodicService
### Готовые сервисы
- MemoryTracer
- Profiler
### Декораторы
@timeout
@asyncbackoff
Подход к проблеме таймаутов и ретрайев со стороны
пользователя
› Пользователю не важна причина по которой сервер не
отвечает
› Пользователь не хочет ждать слишком долго
### Работа с потоками
@aiomisc.threaded
@aiomisc.threaded_iterable
@aiomisc.threaded_separate
### select
### contextvars
```
var = ContextVar('var')
def foo():
var.set('egg')
return var.get()
```