Очередь RabbitMQ позволяет построить различные варианты взаимодействия между сервисами. Наиболее популярные из них приведены ниже.
Producer - отправляет сообщения.
Queue - хранит сообщения. Некоторые свойства:
Durable
- определение очереди сохраняется на диск. Позволяет очереди пережить перезапуск RabbitMQ.
Acknowledgment
- consumer должен вручную подтверждать обработку сообщения.
Не подтвержденные сообщения при падении consumer будут отправлены другим consumer.
Максимальное время обработки сообщения 30 мин.
Consumer - получает сообщения.
Exchange - принимает сообщения от Producer и перенаправляет в Queue.
Binding
- связывает exchange и queue. Смысл binding key зависти от типа Exchange. Варианты binding type:
fanout
- сообщение получат все consumers
direct
- сообщение получат consumer, у которых binding key идентичен routing key сообщения.
topic
- сообщение получат consumer, у которых binding key похож на routing key сообщения.
Message properties
- протокол AMPQ позволяет передавать дополнительные параметры вместе с сообщением. Некоторые свойства:
Persistent - сохранять сообщение на диск.
Позволяет сообщениям пережить перезапуск RabbitMQ.
Сообщение может потеряться за короткое время, пока RabbitMQ сохраняет его на диск.
DeliveryMode
- тоже самое, что и persistent (0 = non persistent, 1 = persistent)
ContentType
- описывает mime-type сообщения. По умолчанию application/json.
ReplayTo
- передать имя очереди для получения результата обработки сообщения.
CorrelationId
- связывает запрос и ответ при обработке сообщения в стиле RPC.
1. Simple Queue
Простейшая настройка очереди.
Exchange = empty
Routing Key = queue name
BasicConsumeAsync
Consumer
2. Worker Queue
Распределяет задачи между обработчиками.
Exchange = empty
Routing Key = queue name
autoAck = false
BasicQosAsync
BasicAckAsync
По умолчанию сообщения распределяются методом round-robin,
когда каждое новое сообщение получает следующий по списку Consumer.
Независимо обработал ли Consumer предыдущие сообщения.
Метод BasicQos позволяет выровнять нагрузку между Consumers. Сообщение передается на обработку Consumer,
только если он обработал все предыдущие.
Consumer A
Consumer B
3. Publish/Subscribe
Отправка сообщения нескольким подписчикам.
Exchange type = fanout
Exchange = exchange name
Routing Key = empty
ExchangeDeclareAsync
QueueBindAsync
Fanout exchange пробрасывает все полученные сообщения во все известные очереди
channel.QueueDeclareAsync создает очередь со случайным именем. Что бы не обрабатывать историю сообщений,
можно создать новую пустую очередь.
Consumer A
Consumer B
4. Routing
Выборочное получение сообщений.
Exchange type = direct
Routing Key = queue binding key
Binding Key = message routing key
Direct exchange пробрасывает сообщения очередь, если queue binding key совпадает с message routing key сообщения
Consumer A
Consumer B
5. Topics
Получение сообщений по шаблону.
Exchange type = topic
Routing Key = words delimited by dot
Binding Key = same as routing key
Topic exchange повторяет поведение Direct exchange за исключением:
* (star) заменяет ровно одно слово
# (hash) заменяет одно слово или ничего
Consumer A
Consumer B
6. RPC
Шаблон запрос/ответ.
Exchange = empty
CorrelationId
ReplayTo
Одна очередь для отправки запроса. Вторая очередь для получения результата.