# 王明泉-王奎-魏鑫-黎俊廷
**Repository Path**: level-23-java/w-w-w-l
## Basic Information
- **Project Name**: 王明泉-王奎-魏鑫-黎俊廷
- **Description**: 4869
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-11-05
- **Last Updated**: 2025-11-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# MessageQueue Spring Boot Starter
这是一个高性能分布式消息队列的Spring Boot Starter,支持高并发、分区队列、消息持久化等功能。
mvn deploy 命令即可部署到Maven仓库
- 主JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0.jar
- 源码JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0-sources.jar
- 文档JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0-javadoc.jar
-
com.example
messagequeue-spring-boot-starter
1.0.0
这个项目与RabbitMQ最为相似,主要体现在:
1. 架构设计 :
- 都采用Broker架构,有中央消息代理
- 都支持多种消息模式(点对点、发布订阅)
- 都有消息持久化和确认机制
2. 功能特性 :
- 都支持消息路由和分发
- 都有消息确认和重试机制
- 都支持优先级消息和延迟消息
- 都有监控和管理界面
3. 使用场景 :
- 都适合企业级应用集成
- 都支持复杂消息路由场景
- 都提供丰富的管理工具
优点 ## 1. 高性能无锁设计
- 无锁队列实现 :项目实现了基于CAS操作的 LockFreeQueue ,避免了传统锁机制带来的性能开销,提高了并发性能
- 分区队列设计 : PartitionedQueue 将单个队列划分为多个分区,提高并发处理能力,支持轮询和指定分区两种消息分发策略
## 2. 多种存储模式灵活切换
- 混合存储架构 :支持文件存储、数据库存储和H2内存存储三种模式,可根据业务场景灵活选择
- 存储适配器设计 :通过适配器模式统一不同存储实现的接口,实现存储方式的热切换
## 3. 智能消息重试与死信处理机制
- 指数退避重试策略 : RetryScheduler 实现了指数退避算法,避免消息风暴
- 死信队列处理 : DeadLetterQueueProcessor 支持死信消息的归档和删除,提供完整的消息生命周期管理
## 4. 高并发优化架构
- 双模式生产者/消费者 :提供基础版和高并发版两种实现,高并发版支持批量处理和分区队列
- 异步处理机制 :内置线程池和异步执行器,提高消息处理吞吐量
## 5. Spring Boot深度集成
- 自动配置机制 :通过 MQAutoConfiguration 实现零配置启动,同时支持细粒度自定义配置
- 条件化Bean创建 :根据配置动态选择合适的实现类,提供灵活的组件组合
## 6. 监控与管理能力
- 实时统计 :提供消息处理、重试、死信等关键指标的实时统计
- Web监控界面 :内置监控界面,方便运维人员查看队列状态
## 7. 轻量级与易部署
- Spring Boot Starter形式 :打包为标准Spring Boot Starter,通过简单的依赖引入即可使用
- 最小化依赖 :相比RabbitMQ、ActiveMQ等传统消息队列,依赖更少,部署更轻量
## 次相似:ActiveMQ
## 一、使用场景
### 1. 中小型应用的消息处理
这个消息队列非常适合中小型应用,特别是那些需要轻量级消息队列解决方案的场景:
- 微服务架构中的服务间通信
- 异步任务处理(如邮件发送、报表生成等)
- 事件驱动架构中的事件分发
- 数据同步和备份任务
### 2. 高并发消息处理场景
项目采用了无锁队列和分区队列设计,适合需要高并发消息处理的场景:
- 电商系统的订单处理
- 金融系统的交易流水处理
- 物联网设备数据收集
- 实时数据分析系统
### 3. 多存储模式需求场景
支持文件、数据库和H2三种存储模式,适合不同需求:
- 文件存储:对性能要求高,可接受数据丢失的场景
- 数据库存储:需要数据持久化和事务保证的场景
- H2存储:开发和测试环境,或轻量级应用
### 4. 需要监控和管理的场景
提供了Web监控界面和REST API,适合需要实时监控的场景:
- 运维团队需要监控消息队列状态
- 需要手动处理死信消息的场景
- 需要动态调整队列配置的场景
## 二、最佳实践(注意事项)
### 1. 存储模式选择
- 生产环境 :建议使用数据库存储模式,确保数据持久化
- 高吞吐场景 :可考虑文件存储模式,但需配置合适的异步刷盘参数
- 测试环境 :使用H2存储模式,简化部署
### 2. 性能优化配置
```
# 高并发场景配置
mq.enhanced=true
mq.store-type=database
mq.async-flush=true
mq.async-flush-interval-ms=1000
# 分区队列配置
mq.partition-count=16
mq.partition-strategy=round_robin
```
### 3. 消息可靠性配置
```
# 消息重试配置
mq.max-retry-count=3
mq.retry-interval-ms=30000
# 死信队列配置
mq.enable-dead-letter-queue=true
mq.
dead-letter-queue-name=dead_letter
mq.archive-dead-messages=true
```
### 4. 监控和运维配置
```
# 启用监控
mq.enable-monitor=true
mq.monitor-path=/monitor
# 暴露所有监控端点
management.endpoints.web.exposure.
include=*
```
### 5. 消费者配置
- 合理设置拉取间隔和批量大小,避免过度消耗资源
- 实现幂等性消费,防止重复消费导致数据不一致
- 及时处理消费失败的消息,避免消息积压
### 6. 生产者配置
- 根据业务需求选择同步或异步发送
- 合理设置消息优先级,确保重要消息优先处理
- 批量发送消息时,控制批次大小,避免内存溢出
## 三、方案的局限性
### 1. 分布式能力有限
- 当前实现主要针对单机环境,缺乏真正的分布式协调机制
- 没有集群支持和故障转移能力
- 跨节点消息同步和一致性保证不足
### 2. 性能瓶颈
- 文件存储模式在高并发下可能成为I/O瓶颈
- 数据库存储模式受限于数据库性能
- 内存使用量随消息量线性增长,没有有效的内存管理机制
### 3. 功能局限性
- 缺乏消息路由和复杂路由策略
- 事务消息支持有限
- 没有延迟消息和定时消息功能
- 消息过滤能力较弱
### 4. 可靠性限制
- 文件存储模式下,异步刷盘可能导致数据丢失
- 没有完善的备份和恢复机制
- 缺乏消息追踪和审计功能
### 5. 运维复杂性
- 监控指标相对简单,缺乏深度分析能力
- 没有自动化运维工具和脚本
- 故障诊断和问题排查工具不足
### 6. 兼容性限制
- 与主流消息队列(如RabbitMQ、Kafka)的协议不兼容
- 缺乏标准化的客户端SDK
- 跨语言支持有限
## 总结
这个消息队列项目是一个轻量级、易集成的Spring Boot Starter,特别适合中小型应用和快速原型开发。它的主要优势在于简单易用、与Spring生态深度集成、支持多种存储模式以及提供基本的监控能力。
然而,对于大型分布式系统或对可靠性、性能有极高要求的场景,建议考虑更成熟的消息队列解决方案,如RabbitMQ、Kafka或RocketMQ。在选择此方案时,应充分评估其局限性,并确保符合业务需求。
## 消息持久化场景
### 1. 数据库持久化场景
该项目通过数据库实现消息持久化,主要适用于以下场景:
- 可靠性要求高的系统 :需要确保消息即使在系统崩溃后也能恢复
- 需要审计和追溯的业务 :如金融交易、订单处理等需要完整记录消息历史
- 分布式环境下的数据一致性 :通过数据库事务保证消息处理的原子性
- 需要复杂查询和分析的场景 :如消息统计、性能分析等
### 2. 持久化实现方式
项目提供了两种持久化方式:
1. 数据库持久化 (默认配置):
- 使用MyBatis + 数据库(MySQL/H2)存储消息
- 消息表结构完整,包含ID、队列名、内容、状态等字段
- 支持索引优化查询性能
2. 文件持久化 :
- 按消息状态分目录存储(pending/consumed/retry/dead)
- 支持同步/异步刷盘机制
- 适合对性能要求高但可靠性要求相对较低的场景
## 最佳实践(注意事项)
### 1. 数据库配置优化
- 索引设计 :
```
CREATE INDEX idx_queue_status ON
mq_message(queue_name, status);
CREATE INDEX idx_message_id ON
mq_message(message_id);
CREATE INDEX idx_create_time ON
mq_message(create_time);
CREATE INDEX idx_next_retry_time
ON mq_message(next_retry_time);
```
这些索引对消息查询性能至关重要,特别是按队列和状态查询的场景
- 死信队列分离 :
```
CREATE TABLE IF NOT EXISTS
mq_dead_message (
dead_id BIGINT IDENTITY
PRIMARY KEY,
message_id VARCHAR(64) NOT
NULL UNIQUE,
queue_name VARCHAR(64) NOT
NULL,
...
);
```
将死信消息单独存储,避免影响正常队列性能
### 2. 消息状态管理
- 状态流转设计 :
- PENDING → PROCESSING → CONSUMED/FAILED
- FAILED → RETRY → (重试多次后) → DEAD
- 确保状态转换逻辑清晰,避免消息丢失
- 重试机制配置 :
```
mq.max-retry-count=3
mq.retry-interval-ms=30000
```
合理设置重试次数和间隔,平衡可靠性和系统资源消耗
### 3. 性能优化
- 批量操作 :
- 使用批量插入减少数据库IO: batchSave()
- 批量更新状态: batchUpdateStatus()
- 批量拉取消息: pull(queueName, batchSize)
- 异步处理 :
```
custom.mq.
consumer-pull-interval=10000
custom.mq.
consumer-pull-batch-size=1
```
根据业务负载调整拉取间隔和批量大小
### 4. 数据清理策略
- 消息过期处理 :
```
mq.
message-expire-time-ms=86400000
# 24小时
mq.archive-dead-messages=true
```
定期清理过期消息,防止数据库无限增长
- 死信消息处理 :
```
mq.enable-dead-letter-queue=true
mq.
dead-letter-process-interval-ms=6
0000
```
定期处理死信消息,避免堆积
### 5. 监控与告警
- 关键指标监控 :
- 待处理消息数量: getPendingCount()
- 重试消息数量: getRetryCount()
- 死信消息数量: getDeadCount()
- 消息处理延迟
- 告警阈值设置 :
- 待处理消息超过阈值
- 死信消息比例过高
- 消息处理延迟过大
## 方案局限性
### 1. 性能瓶颈
- 数据库IO限制 :
- 高并发写入场景下,数据库可能成为瓶颈
- 相比内存队列,持久化方案延迟更高
- 批量操作可以缓解但无法完全消除
- 查询性能限制 :
- 复杂查询(如按内容搜索)性能较差
- 大数据量下,即使有索引也可能存在性能问题
### 2. 可靠性限制
- 单点故障风险 :
- 数据库成为单点,需要额外的高可用配置
- 文件持久化方案对磁盘故障敏感
- 事务范围限制 :
- 消息处理与业务操作可能不在同一事务中
- 需要额外机制保证最终一致性
### 3. 扩展性限制
- 水平扩展困难 :
- 数据库分库分表复杂度高
- 文件持久化方案难以分布式部署
- 存储容量限制 :
- 消息历史数据会占用大量存储空间
- 需要定期清理策略,但清理本身也是性能负担
### 4. 功能局限性
- 消息顺序保证有限 :
- 单队列内可保证FIFO,但跨队列无法保证顺序
- 分布式环境下顺序保证更加复杂
- 消息路由能力有限 :
- 缺乏复杂路由规则支持
- 不支持基于内容的路由
### 5. 运维复杂性
- 数据库维护成本 :
- 需要定期备份、优化、监控
- 数据库版本升级可能影响系统稳定性
- 故障恢复复杂 :
- 数据库故障恢复时间长
- 部分消息可能需要手动干预处理
## 总结
该消息队列的持久化方案适合中小型应用、对可靠性有一定要求但并发量不高的场景。对于高并发、大规模分布式系统,建议考虑使用成熟的消息中间件如RabbitMQ、Kafka等,它们提供了更完善的持久化机制、集群支持和运维工具。
在实际应用中,可以根据业务需求选择合适的持久化策略:
- 对可靠性要求极高的场景:选择数据库持久化
- 对性能要求高但可容忍少量数据丢失的场景:选择文件持久化+异步刷盘
- 混合场景:重要消息使用数据库持久化,普通消息使用文件持久化
与ActiveMQ的相似之处:
1. Java生态集成 :
- 都是纯Java实现
- 都与Spring生态系统深度集成
- 都提供JMS兼容接口
2. 消息模型 :
- 都支持队列和主题两种模式
- 都支持事务消息
- 都有消息分组功能
## 功能特性
- 高性能消息队列实现
- 支持文件和数据库两种存储方式
- 支持分区队列,提高并发处理能力
- 支持消息优先级
- 支持消息过期和死信队列
- 支持消息重试机制
- 提供Web监控界面
- 支持Spring Boot自动配置
## 快速开始
### 1. 添加依赖
在你的Spring Boot项目的`pom.xml`中添加以下依赖:
```xml
com.example
messagequeue-spring-boot-starter
1.0.0
```
### 2. 配置应用
在`application.properties`或`application.yml`中添加配置:
```properties
# 启用消息队列
mq.enabled=true
# 存储类型:file 或 database
mq.store-type=file
# 消息存储路径(当store-type为file时)
mq.store-path=./mq_data
# 启用Web监控界面
mq.enable-monitor=true
# 监控界面路径
mq.monitor-path=/monitor
# 是否启用异步刷盘
mq.async-flush=true
# 异步刷盘间隔(毫秒)
mq.async-flush-interval-ms=1000
# 消息重试次数
mq.max-retry-count=3
# 消息过期时间(毫秒),默认为24小时
mq.message-expire-time-ms=86400000
# 是否启用死信队列
mq.enable-dead-letter-queue=true
# 死信队列名称
mq.dead-letter-queue-name=dead_letter
# 高并发模式配置
mq.high-concurrency.enabled=true
mq.high-concurrency.worker-threads=8
mq.high-concurrency.batch-size=100
mq.high-concurrency.batch-timeout-ms=1000
mq.high-concurrency.partition-count=4
```
### 3. 使用消息队列
在你的代码中注入`IMessageQueue`接口:
```java
@Service
public class MessageService {
@Autowired
private IMessageQueue messageQueue;
public void sendMessage(String topic, String content) {
Message message = new Message(topic, content);
boolean success = messageQueue.sendMessage(message);
if (success) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
public Message receiveMessage(String queueName) {
return messageQueue.receiveMessage(queueName);
}
}
```
### 4. 使用REST API
启动应用后,可以通过以下REST API发送和接收消息:
- 发送消息:`POST /producer/send?topic=&content=`
- 异步发送消息:`POST /producer/sendAsync?topic=&content=`
- 批量发送消息:`POST /producer/sendBatch?topic=&content=&count=`
- 发送优先级消息:`POST /producer/sendPriority?topic=&content=&priority=`
- 接收消息:`GET /consumer/receive?queueName=`
### 5. 监控界面
如果启用了监控界面,可以通过浏览器访问`http://localhost:8080/monitor`查看消息队列的运行状态。
## 高级配置
### 数据库存储配置
如果选择使用数据库存储,需要配置数据源:
```properties
# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/messagequeue
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
```
### 自定义配置
你也可以通过Java配置来自定义消息队列:
```java
@Configuration
public class MessageQueueConfig {
@Bean
@ConditionalOnMissingBean
public MessageQueueProperties messageQueueProperties() {
return new MessageQueueProperties();
}
@Bean
@ConditionalOnMissingBean
public IMessageQueue messageQueue(MessageQueueProperties properties) {
// 自定义消息队列实现
return new CustomMessageQueue(properties);
}
}
```
## 示例项目
参考`mq-client-example`目录下的示例项目,了解如何在实际项目中使用MessageQueue Spring Boot Starter。
>>>>>>> origin/main