### Основные понятия
Производитель(producer) - это приложение, которое отправляет сообщения.
Очередь(queue) - это буфер, в котором хранятся сообщения.
Потребитель(consumer) - это приложение, которое получает сообщения.
Обменник или точка обмена (Exchange) - в него отправляются сообщения. Exchange распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей (bindings) между ним и очередью.
### Отправка соообщений
Отправка сообщение начинается с создания подключения к RabbitMQ с помощью блокирующего метода BlockingConnection. После создания подключения нужно создать канал и очередь в которую отправить сообщение.
```
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
```
Чтобы создать очередь или проверить, что очередь существует используем queue_declare. Если мы отправим сообщение в несуществующую очередь, RabbitMQ просто удалит сообщение.
```
channel.queue_declare(queue='hello')
```
Cообщение не может быть отправлено сразу в очередь, оно всегда должно проходить через обменник.
```
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
```
Перед выходом из програмы нужно закрыть соедение с RabbitMQ, в этот моменты сетевые буферы будут очищены и выполленан проверка, что сообщения действительно было доставлено.
```
connection.close()
```
### Получение
Для получения сообщения из очереди нужно также подключиться к серверу RabbitMQ, выбрать очередь, из которой будем получать сообщения и запустить получние сообщений.
```
channel.queue_declare(queue='hello')
```
Чтобы смотреть какие очереди есть в RabbitMQ и сколько в них сообщений в командной строке используем rabbitmqctl:
```
sudo rabbitmqctl list_queues
```
Получение сообщений из очереди работает создания функции callback функции. Всякий раз, когда мы получаем сообщение, она будет вызывана.
```
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
```
```
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
```
### Как работают очереди
Очереди работают по принипу Round-robin(каждый потребитель получает из очереди по 1 сообщению последовательно). В среднем каждый потребитель получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим.
Плюс очередей, что можно легко масштабироваться за счет создания нескольких воркеров.
#### Подтверждение сообщения
Бывают долгие задачи, и если после получения задачи с ней что-то пойдет не так и она сломается, тогда RabbitMQ не будет знать об этом. Сообщение будет помечено для удаления и сообщение потеряется.
Чтобы гарантировать, что сообщение не будет потеряно, RabbitMQ поддерживает подтвреждение обработки сообщения. Потребитель отправляет в ответ соообщение серверу RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может удалить его.
При подтверждении доставки RabbitMQ устанавливает тайм-аут (по умолчанию 30 минут). Это помогает обнаружить зависших потребителей, которые никогда не подтверждают доставку.
Ручное подтверждение сообщений включено по умолчанию. В предыдущих примерах мы явно отключали их с помощью флага auto_ack=True.
```
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
```
Если воркер умирает, тогда задачу надо передать другому воркеру.
#### Время жизни сообщеня
Подтверждение сообщение настроили, но задачи все равно будут потеряны, если сервер RabbitMQ остановится. Чтобы убедиться, что сообщения не будут потеряны, необходимы две вещи: нам нужно пометить очередь и сообщения как долговечные.
```
channel.queue_declare(queue='hello', durable=True)
```
Такой подход не гарантирует, что сообщение не будет потеряно. Флаг сообщает RabbitMQ что нужно сохранять сообщения на диск, но все еще существует короткий интервал времени, когда RabbitMQ получил сообщение и еще не сохранил его.
RabbitMQ не выполняет fsync для каждого сообщения - оно может быть просто сохранено в кэше и на самом деле не записано на диск. Гарантии невелики, но этого достаточно для простой очереди задач. Если нужна более надежная гарантия, можно использовать publisher confirms.
#### Fair dispatch
Бывает ситуации с двумя воркерами, когда все нечетные сообщения долгие, а четные - быстрые, один воркер будет постоянно занят, а другой практически не будет выполнять никакой работы. RabbitMQ ничего об этом не знает и по-прежнему будет равномерно рассылать сообщения.
Чтобы этого избежать можно использовать метод канала Channel#basic_qos с настройкой prefetch_count=1. RabbitMQ не будет передавать более одного сообщения воркеру до тех пор, пока он не обработает и не подтвердит предыдущее.
```
channel.basic_qos(prefetch_count=1)
```
Если все воркеры заняты, тогда очередь может заполниться. Чтобы это отследить надо использовать TTL сообщений.
### Publish/Subscribe
Если нужно доставиит сообщение нескольким потребителям, тогда используется поход Publish/Subscribe.
Чтобы посмотреть обменники на сервере в консоли введите:
```
sudo rabbitmqctl list_exchanges
```
Обменник создается с помощью аргумента `exchange`
```
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
```
#### Временные очереди
Чтобы создать временную очередь с помощью пустого параметр очереди в queue_declare=''
В итоге получим случайное имя очереди. Например, это может выглядеть как amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Во-вторых, как только потребительское соединение будет закрыто, очередь должна быть удалена. Для этого есть флаг exclusive=True
```
result = channel.queue_declare(queue='', exclusive=True)
```
#### Связи (binding)
Когда мы создали очередь и обменник нам нужно настроить взаимодействие между обменником и очередью это называется связью.
```
channel.queue_bind(exchange='logs',
queue=result.method.queue)
```
Чтобы посмотреть существующие связи
```
rabbitmqctl list_bindings
```
### Маршрутизация сообщений
Обменник поддерживает несколько способов работы:
1. Direct exchange — используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность.
2. Topic exchange – аналогично direct exchange дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #.
3. Fanout exchange – все сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.