# RabbitMQ
**Repository Path**: mostbrain/rabbitmq
## Basic Information
- **Project Name**: RabbitMQ
- **Description**: SpringBoot整合RabbitMQ教程,以及搭建RabbitMQ集群说明
- **Primary Language**: Java
- **License**: WTFPL
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 2
- **Created**: 2024-01-05
- **Last Updated**: 2026-01-24
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RabbitMQ教程
#### 介绍
SpringBoot整合RabbitMQ教程,以及搭建RabbitMQ集群说明
publisher:消息生产者
consumer:消息消费者
queue:队列
exchange:交换机,负责路由消息,消息先发到交换机在路由给队列
virtual-host:虚拟主机,数据隔离
###### spring-amqp配置
````
## host+端口或者addresses集群二选一
spring.rabbitmq.host=${host.ip}
spring.rabbitmq.port=5673
## OR
spring.rabbitmq.addresses=20.198.126.101:5675,20.198.126.102:5675,20.198.126.103:5675,20.198.82.9:5675,20.198.82.10:5675
#确保同一时刻只有一条消息投递给消费者,消费者没处理完不会继续投递
##处理快的多处理,处理慢的少处理
spring.rabbitmq.listener.simple.prefetch=1
## 虚拟主机
spring.rabbitmq.virtual-host=vhost@${host.ip}
## 用户名
spring.rabbitmq.username=admin
## 密码
spring.rabbitmq.password=admin@123
````
## work模型
解决消息堆积
1.一个队列绑定多个消费者,加快消费速度
2.设置spring.rabbitmq.listener.simple.prefetch=1处理快的多处理,处理慢的少处理
3.设置消费者线程数RabbitListener.concurrency
## fanout交换机(广播)
将消息路由给绑定的每一个队列
## direct交换机(定向)
将消息根据规则路由给指定队列
每一个队列消费者与交换机设置一个BindingKey(可以存在多个),若队列的BindingKey一样则作用同fanout交换机
消息生产者发送消息时指定消息的RoutingKey
exchange将消息路由给RoutingKey和BindingKey一样的队列
## topic交换机(话题)
与direct类似,区别在于RoutingKey可以是多个单词的列表,并且以.分割。
BindingKey可以使用通配符,#指代0个或多个单词,*代表一个单词
## AMQP声明式创建队列,无需在控制台手动创建
1.Queue:用于声明队列,可以用工厂类QueueBuilder构建
2.Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3.Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
## 三个可靠性
#### 发送者可靠
###### 链接重试机制
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
* #设置MQ的连接超时时间
spring.rabbitmq.connection-timeout=1s
* #开启超时重试机制
spring.rabbitmq.template.retry.enabled=true
* #失败后的初始等待时间
spring.rabbitmq.template.retry.initial-interval=1000ms
* #失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
spring.rabbitmq.template.retry.multiplier=1
* #最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
注意 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是
阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,
当然也可以考虑使用异步线程来执行发送消息的代码。
###### 消息确认机制
RabbitMQ了Publisher Confirm和Publisher Return两种确认机制。
开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
* 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
* 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
* 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
* 其它情况都会返回NACK,告知投递失败

在publisher这个微服务的application.yml中添加配置:
* #开启publisher confirm机制,并设置confirm类型
spring.rabbitmq.publisher-confirm-type=correlated
* #开启publisher return机制会返回错误信息,一般不需要
spring.rabbitmq.publisher-returns=true
配置说明:
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制
simple:同步阻塞等待MQ的回执消息
correlated:MQ异步回调方式返回回执消息



#### 队列可靠
###### 持久化 durable

RabbitMQ实现数据持久化包括3个方面:
* 交换机持久化
* 队列持久化
* 消息持久化
SpringAMQP默认都是持久的
###### 惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
·消费者要消费消息时才会从磁盘中读取并加载到内存
.支持数百万条的消息存储
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

#### 消费者可靠
##### 消费者确认机制


* #none,关闭ack; manual,手动ack; auto:自动ack(抛异常会一直重试)
spring.rabbitmq.listener.simple.acknowledge-mode=auto
设置acknowledge-mode=auto,当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,
然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
* #开启消费者失败重试
spring.rabbitmq.listener.simple.retry.enabled=true
* #初始的失败等待时长为1秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
* #下次失败的等待时长倍数,下次等待时长=multiplier * last-interval
spring.rabbitmq.listener.simple.retry.multiplier=1
* #最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
* #true无状态;false有状态。如果业务中包含事务,这里改为false
spring.rabbitmq.listener.simple.retry.stateless=true
##### 失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
· RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
· ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
· RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

某个配置生效后创建bean


## 延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
#### 死信交换机(麻烦)
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter) :
* ·消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
* ·消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
* ·要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。
这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

发送消息,设置TTL时间

用自带的消息转换器

#### 延迟消息插件(需要装插件)
RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。
该插件的原理是设计了一种支持延迟消息功能的交换机,
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
将rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez文件复制到
$RABBIT_HOME/plugins目录,启用插件
````
./rabbitmq-plugins -n rabbit5675@192.168.1.128 -l enable rabbitmq_delayed_message_exchange
````
#### rabbitMQ集群创建说明
###### 一台服务器部署多个节点构建集群,要解决端口冲突问题,使用不同的配置文件。
```
RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5673.conf ./rabbitmq-server -detached
RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5674.conf ./rabbitmq-server -detached
RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5675.conf ./rabbitmq-server -detached
```
###### 停用其他节点的,注意节点名唯一@前和自定义命名,@后默认是机器名,这里建议用IP否则要在hosts配置映射关系
###### 如果不在一台机器需要cat ~/.erlang.cookie,将主节点的erlang.cookie写入其他节点机器中
```
./rabbitmqctl stop_app -n rabbit5674@127.0.0.1 -l
./rabbitmqctl stop_app -n rabbit5675@127.0.0.1 -l
```
###### 从节点加入主节点
```
./rabbitmqctl join_cluster rabbit5673@127.0.0.1 -n rabbit5674@127.0.0.1 -l
./rabbitmqctl join_cluster rabbit5673@127.0.0.1 -n rabbit5675@127.0.0.1 -l
```
###### 启用队列
```
./rabbitmqctl start_app -n rabbit5674@127.0.0.1 -l
./rabbitmqctl start_app -n rabbit5675@127.0.0.1 -l
```
###### [工具操作说明](rabbit_cmd.md)
要联系远程节点,请使用 rabbitmqctl、rabbitmq-diagnostics 和其他核心 CLI 工具的 --node (-n) 选项 接受。以下示例联系节点 rabbit@remote-host.local 以了解其状态
如果要操作远程节点需要加--node (-n) rabbit@hostname 如果开启了USE_LONGNAME还要加 -l,
```
### 后台模式启动从节点
./rabbitmq-server -detached
### 服务状态
./rabbitmq-server status
### 启用控制台插件
./rabbitmq-plugins enable rabbitmq_management
###添加用户
./rabbitmqctl add_user
###查看用户
./rabbitmqctl list_users
###修改密码
./rabbitmqctl change_password {username} {new-password}
###删除用户:
./rabbitmqctl delte_user {username}
###赋予用户管理员角色(权限一共有6种,这是管理员)
./rabbitmqctl set_user_tags {username} administrator
###添加虚拟主机
./rabbitmqctl add_vhost vhost@20.198.126.101
###确定虚拟主机所属用户
./rabbitmqctl set_permissions -p vhost@20.198.126.101 admin ".*" ".*" ".*"
###关闭节点
./rabbitmqctl shutdown
### 拉起服务
./rabbitmqctl start_app
### 停服务
./rabbitmqctl stop_app
### 加入主节点集群
./rabbitmqctl join_cluster rabbit@机器名
### 查看集群状态
./rabbitmqctl cluster_status
### 设置镜像策略
./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
```
#### RABBITMQ_CLI_ALIASES_FILE配置别名命令
export RABBITMQ_CLI_ALIASES_FILE=/path/to/cli_aliases.conf
aliases 文件使用非常简约的 ini 样式 alias = 命令格式,用于 例:
env = environment
st = status --quiet
lp = list_parameters --quiet
lq = list_queues --quiet
lu = list_users --quiet
cs = cipher_suites --openssl-format --quiet
#### [rabbitmq-env.conf](https://www.rabbitmq.com/rabbitmq-env.conf.5.html)
配置环境变量RABBITMQ_CONF_ENV_FILE
## 注意
* rabbitMQ先将消息交给交换机,再通过routerKey找对应的队列,再把消息路由给队列。
代码声明的队列又不会主动删除绑定关系,现象就类似多个KEY绑定在一个队列上。
* ***注意已存在的交换机如果修改了绑定关系,需要先删除交换机,否则只会追加绑定关系,废弃的不会自动删除
* ***修改key是队列名也需要同步修改不然队列名一样key不一样,key作用会失效。
* ***建议不要修改消费者,直接重新创建。如果必须修改则在管理端把交换机删除
#### 幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
* 方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
①每一条消息都生成一个唯一的id,与消息一起投递给消费者。
②消费者接收到消息后处理自己的业务,业务处理成功后将消息I心保存到数据库
③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
* 方案二,是结合业务逻辑,基于业务本身做判断。
#### 高可用仲裁队列