### Основные понятия Производитель(producer) - это приложение, которое отправляет сообщения. Очередь(queue) - это буфер, в котором хранятся сообщения. Потребитель(consumer) - это приложение, которое получает сообщения. Обменник или точка обмена (Exchange) - в него отправляются сообщения. Exchange распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей (bindings) между ним и очередью. ### Отправка соообщений Отправка сообщение начинается с создания подключения к RabbitMQ с помощью блокирующего метода BlockingConnection. После создания подключения нужно создать канал и очередь в которую отправить сообщение. ``` import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() ``` Чтобы создать очередь или проверить, что очередь существует используем queue_declare. Если мы отправим сообщение в несуществующую очередь, RabbitMQ просто удалит сообщение. ``` channel.queue_declare(queue='hello') ``` Cообщение не может быть отправлено сразу в очередь, оно всегда должно проходить через обменник. ``` channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') ``` Перед выходом из програмы нужно закрыть соедение с RabbitMQ, в этот моменты сетевые буферы будут очищены и выполленан проверка, что сообщения действительно было доставлено. ``` connection.close() ``` ### Получение Для получения сообщения из очереди нужно также подключиться к серверу RabbitMQ, выбрать очередь, из которой будем получать сообщения и запустить получние сообщений. ``` channel.queue_declare(queue='hello') ``` Чтобы смотреть какие очереди есть в RabbitMQ и сколько в них сообщений в командной строке используем rabbitmqctl: ``` sudo rabbitmqctl list_queues ``` Получение сообщений из очереди работает создания функции callback функции. Всякий раз, когда мы получаем сообщение, она будет вызывана. ``` def callback(ch, method, properties, body): print(" [x] Received %r" % body) ``` ``` channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) channel.start_consuming() ``` ### Как работают очереди Очереди работают по принипу Round-robin(каждый потребитель получает из очереди по 1 сообщению последовательно). В среднем каждый потребитель получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим. Плюс очередей, что можно легко масштабироваться за счет создания нескольких воркеров. #### Подтверждение сообщения Бывают долгие задачи, и если после получения задачи с ней что-то пойдет не так и она сломается, тогда RabbitMQ не будет знать об этом. Сообщение будет помечено для удаления и сообщение потеряется. Чтобы гарантировать, что сообщение не будет потеряно, RabbitMQ поддерживает подтвреждение обработки сообщения. Потребитель отправляет в ответ соообщение серверу RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может удалить его. При подтверждении доставки RabbitMQ устанавливает тайм-аут (по умолчанию 30 минут). Это помогает обнаружить зависших потребителей, которые никогда не подтверждают доставку. Ручное подтверждение сообщений включено по умолчанию. В предыдущих примерах мы явно отключали их с помощью флага auto_ack=True. ``` def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.') ) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) ``` Если воркер умирает, тогда задачу надо передать другому воркеру. #### Время жизни сообщеня Подтверждение сообщение настроили, но задачи все равно будут потеряны, если сервер RabbitMQ остановится. Чтобы убедиться, что сообщения не будут потеряны, необходимы две вещи: нам нужно пометить очередь и сообщения как долговечные. ``` channel.queue_declare(queue='hello', durable=True) ``` Такой подход не гарантирует, что сообщение не будет потеряно. Флаг сообщает RabbitMQ что нужно сохранять сообщения на диск, но все еще существует короткий интервал времени, когда RabbitMQ получил сообщение и еще не сохранил его. RabbitMQ не выполняет fsync для каждого сообщения - оно может быть просто сохранено в кэше и на самом деле не записано на диск. Гарантии невелики, но этого достаточно для простой очереди задач. Если нужна более надежная гарантия, можно использовать publisher confirms. #### Fair dispatch Бывает ситуации с двумя воркерами, когда все нечетные сообщения долгие, а четные - быстрые, один воркер будет постоянно занят, а другой практически не будет выполнять никакой работы. RabbitMQ ничего об этом не знает и по-прежнему будет равномерно рассылать сообщения. Чтобы этого избежать можно использовать метод канала Channel#basic_qos с настройкой prefetch_count=1. RabbitMQ не будет передавать более одного сообщения воркеру до тех пор, пока он не обработает и не подтвердит предыдущее. ``` channel.basic_qos(prefetch_count=1) ``` Если все воркеры заняты, тогда очередь может заполниться. Чтобы это отследить надо использовать TTL сообщений. ### Publish/Subscribe Если нужно доставиит сообщение нескольким потребителям, тогда используется поход Publish/Subscribe. Чтобы посмотреть обменники на сервере в консоли введите: ``` sudo rabbitmqctl list_exchanges ``` Обменник создается с помощью аргумента `exchange` ``` channel.basic_publish(exchange='logs', routing_key='', body=message) ``` #### Временные очереди Чтобы создать временную очередь с помощью пустого параметр очереди в queue_declare='' В итоге получим случайное имя очереди. Например, это может выглядеть как amq.gen-JzTY20BRgKO-HjmUJj0wLg. Во-вторых, как только потребительское соединение будет закрыто, очередь должна быть удалена. Для этого есть флаг exclusive=True ``` result = channel.queue_declare(queue='', exclusive=True) ``` #### Связи (binding) Когда мы создали очередь и обменник нам нужно настроить взаимодействие между обменником и очередью это называется связью. ``` channel.queue_bind(exchange='logs', queue=result.method.queue) ``` Чтобы посмотреть существующие связи ``` rabbitmqctl list_bindings ``` ### Маршрутизация сообщений Обменник поддерживает несколько способов работы: 1. Direct exchange — используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность. 2. Topic exchange – аналогично direct exchange дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #. 3. Fanout exchange – все сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.