# RabbitMQ **Repository Path**: mostbrain/rabbitmq ## Basic Information - **Project Name**: RabbitMQ - **Description**: SpringBoot整合RabbitMQ教程,以及搭建RabbitMQ集群说明 - **Primary Language**: Java - **License**: WTFPL - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 2 - **Created**: 2024-01-05 - **Last Updated**: 2026-01-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RabbitMQ教程 #### 介绍 SpringBoot整合RabbitMQ教程,以及搭建RabbitMQ集群说明 publisher:消息生产者
consumer:消息消费者
queue:队列
exchange:交换机,负责路由消息,消息先发到交换机在路由给队列
virtual-host:虚拟主机,数据隔离
###### spring-amqp配置 ```` ## host+端口或者addresses集群二选一 spring.rabbitmq.host=${host.ip} spring.rabbitmq.port=5673 ## OR spring.rabbitmq.addresses=20.198.126.101:5675,20.198.126.102:5675,20.198.126.103:5675,20.198.82.9:5675,20.198.82.10:5675 #确保同一时刻只有一条消息投递给消费者,消费者没处理完不会继续投递 ##处理快的多处理,处理慢的少处理 spring.rabbitmq.listener.simple.prefetch=1 ## 虚拟主机 spring.rabbitmq.virtual-host=vhost@${host.ip} ## 用户名 spring.rabbitmq.username=admin ## 密码 spring.rabbitmq.password=admin@123 ```` ## work模型 解决消息堆积
1.一个队列绑定多个消费者,加快消费速度
2.设置spring.rabbitmq.listener.simple.prefetch=1处理快的多处理,处理慢的少处理
3.设置消费者线程数RabbitListener.concurrency
## fanout交换机(广播) 将消息路由给绑定的每一个队列 ## direct交换机(定向) 将消息根据规则路由给指定队列
每一个队列消费者与交换机设置一个BindingKey(可以存在多个),若队列的BindingKey一样则作用同fanout交换机
消息生产者发送消息时指定消息的RoutingKey
exchange将消息路由给RoutingKey和BindingKey一样的队列
## topic交换机(话题) 与direct类似,区别在于RoutingKey可以是多个单词的列表,并且以.分割。
BindingKey可以使用通配符,#指代0个或多个单词,*代表一个单词
## AMQP声明式创建队列,无需在控制台手动创建 1.Queue:用于声明队列,可以用工厂类QueueBuilder构建
2.Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3.Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
## 三个可靠性 #### 发送者可靠 ###### 链接重试机制 有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制: * #设置MQ的连接超时时间 spring.rabbitmq.connection-timeout=1s * #开启超时重试机制 spring.rabbitmq.template.retry.enabled=true * #失败后的初始等待时间 spring.rabbitmq.template.retry.initial-interval=1000ms * #失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier spring.rabbitmq.template.retry.multiplier=1 * #最大重试次数 spring.rabbitmq.template.retry.max-attempts=3 注意 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是 阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数, 当然也可以考虑使用异步线程来执行发送消息的代码。 ###### 消息确认机制 RabbitMQ了Publisher Confirm和Publisher Return两种确认机制。 开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况: * 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功 * 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功 * 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功 * 其它情况都会返回NACK,告知投递失败 ![img.png](assets/img.png) 在publisher这个微服务的application.yml中添加配置: * #开启publisher confirm机制,并设置confirm类型 spring.rabbitmq.publisher-confirm-type=correlated * #开启publisher return机制会返回错误信息,一般不需要 spring.rabbitmq.publisher-returns=true 配置说明: 这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制
simple:同步阻塞等待MQ的回执消息
correlated:MQ异步回调方式返回回执消息
![img_1.png](assets/img_1.png) ![img_2.png](assets/img_2.png) ![img_3.png](assets/img_3.png) #### 队列可靠 ###### 持久化 durable ![img_4.png](assets/img_4.png) RabbitMQ实现数据持久化包括3个方面: * 交换机持久化 * 队列持久化 * 消息持久化 SpringAMQP默认都是持久的 ###### 惰性队列 从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
·消费者要消费消息时才会从磁盘中读取并加载到内存
.支持数百万条的消息存储
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
![img_5.png](assets/img_5.png) #### 消费者可靠 ##### 消费者确认机制 ![img_6.png](assets/img_6.png) ![img_7.png](assets/img_7.png) * #none,关闭ack; manual,手动ack; auto:自动ack(抛异常会一直重试) spring.rabbitmq.listener.simple.acknowledge-mode=auto 设置acknowledge-mode=auto,当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者, 然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列 * #开启消费者失败重试 spring.rabbitmq.listener.simple.retry.enabled=true * #初始的失败等待时长为1秒 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms * #下次失败的等待时长倍数,下次等待时长=multiplier * last-interval spring.rabbitmq.listener.simple.retry.multiplier=1 * #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=3 * #true无状态;false有状态。如果业务中包含事务,这里改为false spring.rabbitmq.listener.simple.retry.stateless=true ##### 失败消息处理策略 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
· RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
· ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
· RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
![img_8.png](assets/img_8.png) 某个配置生效后创建bean ![img_9.png](assets/img_9.png) ![img_10.png](assets/img_10.png) ## 延迟消息 延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。 延迟任务:设置在一定时间之后才执行的任务 #### 死信交换机(麻烦) 当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter) : * ·消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false * ·消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费 * ·要投递的队列消息堆积满了,最早的消息可能成为死信 如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。 这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。 ![img_13.png](assets/img_13.png) 发送消息,设置TTL时间 ![img_11.png](assets/img_11.png) 用自带的消息转换器 ![img_12.png](assets/img_12.png) #### 延迟消息插件(需要装插件) RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。 该插件的原理是设计了一种支持延迟消息功能的交换机, 当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。 将rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez文件复制到 $RABBIT_HOME/plugins目录,启用插件 ```` ./rabbitmq-plugins -n rabbit5675@192.168.1.128 -l enable rabbitmq_delayed_message_exchange ```` #### rabbitMQ集群创建说明 ###### 一台服务器部署多个节点构建集群,要解决端口冲突问题,使用不同的配置文件。 ``` RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5673.conf ./rabbitmq-server -detached RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5674.conf ./rabbitmq-server -detached RABBITMQ_CONF_ENV_FILE=${RABBITMQ_HOME}/rabbitmq-env5675.conf ./rabbitmq-server -detached ``` ###### 停用其他节点的,注意节点名唯一@前和自定义命名,@后默认是机器名,这里建议用IP否则要在hosts配置映射关系 ###### 如果不在一台机器需要cat ~/.erlang.cookie,将主节点的erlang.cookie写入其他节点机器中 ``` ./rabbitmqctl stop_app -n rabbit5674@127.0.0.1 -l ./rabbitmqctl stop_app -n rabbit5675@127.0.0.1 -l ``` ###### 从节点加入主节点 ``` ./rabbitmqctl join_cluster rabbit5673@127.0.0.1 -n rabbit5674@127.0.0.1 -l ./rabbitmqctl join_cluster rabbit5673@127.0.0.1 -n rabbit5675@127.0.0.1 -l ``` ###### 启用队列 ``` ./rabbitmqctl start_app -n rabbit5674@127.0.0.1 -l ./rabbitmqctl start_app -n rabbit5675@127.0.0.1 -l ``` ###### [工具操作说明](rabbit_cmd.md) 要联系远程节点,请使用 rabbitmqctl、rabbitmq-diagnostics 和其他核心 CLI 工具的 --node (-n) 选项 接受。以下示例联系节点 rabbit@remote-host.local 以了解其状态 如果要操作远程节点需要加--node (-n) rabbit@hostname 如果开启了USE_LONGNAME还要加 -l, ``` ### 后台模式启动从节点 ./rabbitmq-server -detached ### 服务状态 ./rabbitmq-server status ### 启用控制台插件 ./rabbitmq-plugins enable rabbitmq_management ###添加用户 ./rabbitmqctl add_user ###查看用户 ./rabbitmqctl list_users ###修改密码 ./rabbitmqctl change_password {username} {new-password} ###删除用户: ./rabbitmqctl delte_user {username} ###赋予用户管理员角色(权限一共有6种,这是管理员) ./rabbitmqctl set_user_tags {username} administrator ###添加虚拟主机 ./rabbitmqctl add_vhost vhost@20.198.126.101 ###确定虚拟主机所属用户 ./rabbitmqctl set_permissions -p vhost@20.198.126.101 admin ".*" ".*" ".*" ###关闭节点 ./rabbitmqctl shutdown ### 拉起服务 ./rabbitmqctl start_app ### 停服务 ./rabbitmqctl stop_app ### 加入主节点集群 ./rabbitmqctl join_cluster rabbit@机器名 ### 查看集群状态 ./rabbitmqctl cluster_status ### 设置镜像策略 ./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' ``` #### RABBITMQ_CLI_ALIASES_FILE配置别名命令 export RABBITMQ_CLI_ALIASES_FILE=/path/to/cli_aliases.conf aliases 文件使用非常简约的 ini 样式 alias = 命令格式,用于 例: env = environment st = status --quiet lp = list_parameters --quiet lq = list_queues --quiet lu = list_users --quiet cs = cipher_suites --openssl-format --quiet #### [rabbitmq-env.conf](https://www.rabbitmq.com/rabbitmq-env.conf.5.html) 配置环境变量RABBITMQ_CONF_ENV_FILE ## 注意 * rabbitMQ先将消息交给交换机,再通过routerKey找对应的队列,再把消息路由给队列。 代码声明的队列又不会主动删除绑定关系,现象就类似多个KEY绑定在一个队列上。 * ***注意已存在的交换机如果修改了绑定关系,需要先删除交换机,否则只会追加绑定关系,废弃的不会自动删除 * ***修改key是队列名也需要同步修改不然队列名一样key不一样,key作用会失效。 * ***建议不要修改消费者,直接重新创建。如果必须修改则在管理端把交换机删除 #### 幂等性 幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。 * 方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
①每一条消息都生成一个唯一的id,与消息一起投递给消费者。
②消费者接收到消息后处理自己的业务,业务处理成功后将消息I心保存到数据库
③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
* 方案二,是结合业务逻辑,基于业务本身做判断。 #### 高可用仲裁队列