# Проблемы ## Управление фоновыми задачами ### Запуск и остановка Event Loop ``` import asyncio, time async def handler(reader, writer): writer.write(f'{time.time()}\n\r'.encode()) writer.close() async def server(): server = await asyncio.start_server(handler, '::1', 2023) await server.serve_forever() async def main(): await server() asyncio.run(main()) ``` ### Управление задачами ``` async def cache(interval): """ Start updateing response cache """ async def server(): """ Start TCP server """ async def statistic(): """ Start sending usage statistic """ async def log_sender(): """ Start sending internal logs to LOG collector """ async def main(): await asyncio.gather(cache(), server(), statistic(), log_sender()) asyncio.run(main()) ``` ### Прототипы задач ``` async def cache(interval): """ Start updateing response cache """ while True: await update_global_cache() await asyncio.sleep(interval) async def server(): """ Start TCP server """ await passing_config_to_handler() await create_server() async def statistic(): """ Start sending usage statistic """ await configuring_sender() await start_sending() async def log_sender(): """ Start sending internal logs to LOG collector """ await configuring_sender() await start_sending() async def main(): await asyncio.gather( cache(), server(), statistic(), log_sender() ) asyncio.run(main()) ``` ### Решение с wait ``` async def main(): done, pending = await asyncio.wait( [ cache(), server(), statistic(), log_sender(), ], return_when=asyncio.FIRST_EXCEPTION ) ... # Если одна задача упала, нужно отменить все остальные asyncio.run(main()) ``` ### Проблема 2. Stdout блокируется ### Проблема 3. Retry policy - Сеть работает довольно сложно, и иногда маршрутизаторы сбрасывают соединения/перезагружаются/и т.д. - TCP соединения разрываются - это норма Варианты решения: 1. Умный клиент. Умный клиент делает retry 2. Умный прокси. Reverse proxy делает retry 3. Умный сервис. Молча пробуем еще раз #### Чего хочется на практике? - Клиент не готов ждать выполнения запроса вечно. - Обычно он не готов ждать больше секунды (ну пяти в крайнем случае) - Требуется “вести себя прилично” и не “завалить” удаленный сервер запросами. (мы же асинхронные, и можем их много и часто запрашивать) - Нужно понимать какие именно исключения нужно бросать сразу. Например в ответ на ошибку доступа нет смысла пытаться сделать повторный запрос. ### Блокирующие операции ThreadPool/ProcessPool - Блокирующие операции нужно выполнять отдельно от EventLoop, в пуле потоков или процессов - Чаще всего блокирующие операции отпускают GIL, и треды не ждут друг-друга, но нужно это отдельно исследовать - Задачи которые делают много вычислений (особенно в циклах), можно вызывать в ProcessPool или использовать что-то что выполнит эту задачу оптимально (Numpy/Numba/Cython) ``` import asyncio from concurrent.futures import ThreadPoolExecutor loop = asyncio.new_event_loop() loop.set_default_executor(ThreadPoolExecutor(8)) def reader(fname): loop = asyncio.get_running_loop() def read_file(): with open(fname, "rb") as fp: return fp.read() return loop.run_in_executor(None, read_file) async def main(): print(await reader("/etc/bashrc")) loop.run_until_complete(main()) ``` ## aiomisc - это библиотека с различными утилитами для asyncio Разработала команда проекта Едадил, чтобы убрать повторяющийся код. ### Основные концепции entrypoint - это абстаркция, как asyncio.run, но более явно. ** Запуск и остановка EventLoop ** ``` import asyncio import aiomisc async def main(): await asyncio.sleep(1) with aiomisc.entrypoint() as loop: loop.run_until_complete(main()) ``` ``` import asyncio, time, aiomisc async def handler(reader, writer): writer.write(f'{time.time()}\n\r'.encode()) writer.close() async def server(): server = await asyncio.start_server(handler, '::1', 2023) async def main(): await server() with aiomisc.entrypoint() as loop: loop.create_task(main()) loop.run_forever() ``` Что произошло? Создание и запуск event loop - создает default thread pool - подключаем uvloop (если установлен) - инициализируем thread-pool - отдаем объект в контекст - выполняем пользовательский код в контексте - ожидание всех блоков finally и закрытие экземпляра event loop ### Service Это базовый клас. У которого есть два основых метода - Старт и стоп. ``` import asyncio from aiomisc import entrypoint, Service class MyService(Service): async def start(self): self.task = self.loop.create_task(asyncio.sleep(3600, loop=self.loop)) async def stop(self): self.task.cancel() try: await self.task except asyncio.CancelledError: pass ``` Как запустить сервис? ``` with entrypoint(MyService()) as loop: loop.run_forever() ``` Как запустить сервисы? ``` import asyncio from aiomisc import entrypoint, Service class MyService(Service): async def start(self): self.start_event.set() await asyncio.sleep(3600, loop=self.loop) s1 = MyService() s2 = MyService() with entrypoint(s1, s2, log_level='info') as loop: loop.run_forever() ``` ### Базовые классы сервисов - TCPServer - TLSServer - UDPServer - PeriodicService ### Готовые сервисы - MemoryTracer - Profiler ### Декораторы @timeout @asyncbackoff Подход к проблеме таймаутов и ретрайев со стороны пользователя › Пользователю не важна причина по которой сервер не отвечает › Пользователь не хочет ждать слишком долго ### Работа с потоками @aiomisc.threaded @aiomisc.threaded_iterable @aiomisc.threaded_separate ### select ### contextvars ``` var = ContextVar('var') def foo(): var.set('egg') return var.get() ```