# 王明泉-王奎-魏鑫-黎俊廷 **Repository Path**: level-23-java/w-w-w-l ## Basic Information - **Project Name**: 王明泉-王奎-魏鑫-黎俊廷 - **Description**: 4869 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-05 - **Last Updated**: 2025-11-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MessageQueue Spring Boot Starter 这是一个高性能分布式消息队列的Spring Boot Starter,支持高并发、分区队列、消息持久化等功能。 mvn deploy 命令即可部署到Maven仓库 - 主JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0.jar - 源码JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0-sources.jar - 文档JAR包 : d:\dasanshang\MessageQueue\target\messagequeue-spring-boot-starter-1.0.0-javadoc.jar - com.example messagequeue-spring-boot-starter 1.0.0 这个项目与RabbitMQ最为相似,主要体现在: 1. 架构设计 : - 都采用Broker架构,有中央消息代理 - 都支持多种消息模式(点对点、发布订阅) - 都有消息持久化和确认机制 2. 功能特性 : - 都支持消息路由和分发 - 都有消息确认和重试机制 - 都支持优先级消息和延迟消息 - 都有监控和管理界面 3. 使用场景 : - 都适合企业级应用集成 - 都支持复杂消息路由场景 - 都提供丰富的管理工具 优点 ## 1. 高性能无锁设计 - 无锁队列实现 :项目实现了基于CAS操作的 LockFreeQueue ,避免了传统锁机制带来的性能开销,提高了并发性能 - 分区队列设计 : PartitionedQueue 将单个队列划分为多个分区,提高并发处理能力,支持轮询和指定分区两种消息分发策略 ## 2. 多种存储模式灵活切换 - 混合存储架构 :支持文件存储、数据库存储和H2内存存储三种模式,可根据业务场景灵活选择 - 存储适配器设计 :通过适配器模式统一不同存储实现的接口,实现存储方式的热切换 ## 3. 智能消息重试与死信处理机制 - 指数退避重试策略 : RetryScheduler 实现了指数退避算法,避免消息风暴 - 死信队列处理 : DeadLetterQueueProcessor 支持死信消息的归档和删除,提供完整的消息生命周期管理 ## 4. 高并发优化架构 - 双模式生产者/消费者 :提供基础版和高并发版两种实现,高并发版支持批量处理和分区队列 - 异步处理机制 :内置线程池和异步执行器,提高消息处理吞吐量 ## 5. Spring Boot深度集成 - 自动配置机制 :通过 MQAutoConfiguration 实现零配置启动,同时支持细粒度自定义配置 - 条件化Bean创建 :根据配置动态选择合适的实现类,提供灵活的组件组合 ## 6. 监控与管理能力 - 实时统计 :提供消息处理、重试、死信等关键指标的实时统计 - Web监控界面 :内置监控界面,方便运维人员查看队列状态 ## 7. 轻量级与易部署 - Spring Boot Starter形式 :打包为标准Spring Boot Starter,通过简单的依赖引入即可使用 - 最小化依赖 :相比RabbitMQ、ActiveMQ等传统消息队列,依赖更少,部署更轻量 ## 次相似:ActiveMQ ## 一、使用场景 ### 1. 中小型应用的消息处理 这个消息队列非常适合中小型应用,特别是那些需要轻量级消息队列解决方案的场景: - 微服务架构中的服务间通信 - 异步任务处理(如邮件发送、报表生成等) - 事件驱动架构中的事件分发 - 数据同步和备份任务 ### 2. 高并发消息处理场景 项目采用了无锁队列和分区队列设计,适合需要高并发消息处理的场景: - 电商系统的订单处理 - 金融系统的交易流水处理 - 物联网设备数据收集 - 实时数据分析系统 ### 3. 多存储模式需求场景 支持文件、数据库和H2三种存储模式,适合不同需求: - 文件存储:对性能要求高,可接受数据丢失的场景 - 数据库存储:需要数据持久化和事务保证的场景 - H2存储:开发和测试环境,或轻量级应用 ### 4. 需要监控和管理的场景 提供了Web监控界面和REST API,适合需要实时监控的场景: - 运维团队需要监控消息队列状态 - 需要手动处理死信消息的场景 - 需要动态调整队列配置的场景 ## 二、最佳实践(注意事项) ### 1. 存储模式选择 - 生产环境 :建议使用数据库存储模式,确保数据持久化 - 高吞吐场景 :可考虑文件存储模式,但需配置合适的异步刷盘参数 - 测试环境 :使用H2存储模式,简化部署 ### 2. 性能优化配置 ``` # 高并发场景配置 mq.enhanced=true mq.store-type=database mq.async-flush=true mq.async-flush-interval-ms=1000 # 分区队列配置 mq.partition-count=16 mq.partition-strategy=round_robin ``` ### 3. 消息可靠性配置 ``` # 消息重试配置 mq.max-retry-count=3 mq.retry-interval-ms=30000 # 死信队列配置 mq.enable-dead-letter-queue=true mq. dead-letter-queue-name=dead_letter mq.archive-dead-messages=true ``` ### 4. 监控和运维配置 ``` # 启用监控 mq.enable-monitor=true mq.monitor-path=/monitor # 暴露所有监控端点 management.endpoints.web.exposure. include=* ``` ### 5. 消费者配置 - 合理设置拉取间隔和批量大小,避免过度消耗资源 - 实现幂等性消费,防止重复消费导致数据不一致 - 及时处理消费失败的消息,避免消息积压 ### 6. 生产者配置 - 根据业务需求选择同步或异步发送 - 合理设置消息优先级,确保重要消息优先处理 - 批量发送消息时,控制批次大小,避免内存溢出 ## 三、方案的局限性 ### 1. 分布式能力有限 - 当前实现主要针对单机环境,缺乏真正的分布式协调机制 - 没有集群支持和故障转移能力 - 跨节点消息同步和一致性保证不足 ### 2. 性能瓶颈 - 文件存储模式在高并发下可能成为I/O瓶颈 - 数据库存储模式受限于数据库性能 - 内存使用量随消息量线性增长,没有有效的内存管理机制 ### 3. 功能局限性 - 缺乏消息路由和复杂路由策略 - 事务消息支持有限 - 没有延迟消息和定时消息功能 - 消息过滤能力较弱 ### 4. 可靠性限制 - 文件存储模式下,异步刷盘可能导致数据丢失 - 没有完善的备份和恢复机制 - 缺乏消息追踪和审计功能 ### 5. 运维复杂性 - 监控指标相对简单,缺乏深度分析能力 - 没有自动化运维工具和脚本 - 故障诊断和问题排查工具不足 ### 6. 兼容性限制 - 与主流消息队列(如RabbitMQ、Kafka)的协议不兼容 - 缺乏标准化的客户端SDK - 跨语言支持有限 ## 总结 这个消息队列项目是一个轻量级、易集成的Spring Boot Starter,特别适合中小型应用和快速原型开发。它的主要优势在于简单易用、与Spring生态深度集成、支持多种存储模式以及提供基本的监控能力。 然而,对于大型分布式系统或对可靠性、性能有极高要求的场景,建议考虑更成熟的消息队列解决方案,如RabbitMQ、Kafka或RocketMQ。在选择此方案时,应充分评估其局限性,并确保符合业务需求。 ## 消息持久化场景 ### 1. 数据库持久化场景 该项目通过数据库实现消息持久化,主要适用于以下场景: - 可靠性要求高的系统 :需要确保消息即使在系统崩溃后也能恢复 - 需要审计和追溯的业务 :如金融交易、订单处理等需要完整记录消息历史 - 分布式环境下的数据一致性 :通过数据库事务保证消息处理的原子性 - 需要复杂查询和分析的场景 :如消息统计、性能分析等 ### 2. 持久化实现方式 项目提供了两种持久化方式: 1. 数据库持久化 (默认配置): - 使用MyBatis + 数据库(MySQL/H2)存储消息 - 消息表结构完整,包含ID、队列名、内容、状态等字段 - 支持索引优化查询性能 2. 文件持久化 : - 按消息状态分目录存储(pending/consumed/retry/dead) - 支持同步/异步刷盘机制 - 适合对性能要求高但可靠性要求相对较低的场景 ## 最佳实践(注意事项) ### 1. 数据库配置优化 - 索引设计 : ``` CREATE INDEX idx_queue_status ON  mq_message(queue_name, status); CREATE INDEX idx_message_id ON  mq_message(message_id); CREATE INDEX idx_create_time ON  mq_message(create_time); CREATE INDEX idx_next_retry_time  ON mq_message(next_retry_time); ``` 这些索引对消息查询性能至关重要,特别是按队列和状态查询的场景 - 死信队列分离 : ``` CREATE TABLE IF NOT EXISTS  mq_dead_message (     dead_id BIGINT IDENTITY      PRIMARY KEY,     message_id VARCHAR(64) NOT      NULL UNIQUE,     queue_name VARCHAR(64) NOT      NULL,     ... ); ``` 将死信消息单独存储,避免影响正常队列性能 ### 2. 消息状态管理 - 状态流转设计 : - PENDING → PROCESSING → CONSUMED/FAILED - FAILED → RETRY → (重试多次后) → DEAD - 确保状态转换逻辑清晰,避免消息丢失 - 重试机制配置 : ``` mq.max-retry-count=3 mq.retry-interval-ms=30000 ``` 合理设置重试次数和间隔,平衡可靠性和系统资源消耗 ### 3. 性能优化 - 批量操作 : - 使用批量插入减少数据库IO: batchSave() - 批量更新状态: batchUpdateStatus() - 批量拉取消息: pull(queueName, batchSize) - 异步处理 : ``` custom.mq. consumer-pull-interval=10000 custom.mq. consumer-pull-batch-size=1 ``` 根据业务负载调整拉取间隔和批量大小 ### 4. 数据清理策略 - 消息过期处理 : ``` mq. message-expire-time-ms=86400000   # 24小时 mq.archive-dead-messages=true ``` 定期清理过期消息,防止数据库无限增长 - 死信消息处理 : ``` mq.enable-dead-letter-queue=true mq. dead-letter-process-interval-ms=6 0000 ``` 定期处理死信消息,避免堆积 ### 5. 监控与告警 - 关键指标监控 : - 待处理消息数量: getPendingCount() - 重试消息数量: getRetryCount() - 死信消息数量: getDeadCount() - 消息处理延迟 - 告警阈值设置 : - 待处理消息超过阈值 - 死信消息比例过高 - 消息处理延迟过大 ## 方案局限性 ### 1. 性能瓶颈 - 数据库IO限制 : - 高并发写入场景下,数据库可能成为瓶颈 - 相比内存队列,持久化方案延迟更高 - 批量操作可以缓解但无法完全消除 - 查询性能限制 : - 复杂查询(如按内容搜索)性能较差 - 大数据量下,即使有索引也可能存在性能问题 ### 2. 可靠性限制 - 单点故障风险 : - 数据库成为单点,需要额外的高可用配置 - 文件持久化方案对磁盘故障敏感 - 事务范围限制 : - 消息处理与业务操作可能不在同一事务中 - 需要额外机制保证最终一致性 ### 3. 扩展性限制 - 水平扩展困难 : - 数据库分库分表复杂度高 - 文件持久化方案难以分布式部署 - 存储容量限制 : - 消息历史数据会占用大量存储空间 - 需要定期清理策略,但清理本身也是性能负担 ### 4. 功能局限性 - 消息顺序保证有限 : - 单队列内可保证FIFO,但跨队列无法保证顺序 - 分布式环境下顺序保证更加复杂 - 消息路由能力有限 : - 缺乏复杂路由规则支持 - 不支持基于内容的路由 ### 5. 运维复杂性 - 数据库维护成本 : - 需要定期备份、优化、监控 - 数据库版本升级可能影响系统稳定性 - 故障恢复复杂 : - 数据库故障恢复时间长 - 部分消息可能需要手动干预处理 ## 总结 该消息队列的持久化方案适合中小型应用、对可靠性有一定要求但并发量不高的场景。对于高并发、大规模分布式系统,建议考虑使用成熟的消息中间件如RabbitMQ、Kafka等,它们提供了更完善的持久化机制、集群支持和运维工具。 在实际应用中,可以根据业务需求选择合适的持久化策略: - 对可靠性要求极高的场景:选择数据库持久化 - 对性能要求高但可容忍少量数据丢失的场景:选择文件持久化+异步刷盘 - 混合场景:重要消息使用数据库持久化,普通消息使用文件持久化 与ActiveMQ的相似之处: 1. Java生态集成 : - 都是纯Java实现 - 都与Spring生态系统深度集成 - 都提供JMS兼容接口 2. 消息模型 : - 都支持队列和主题两种模式 - 都支持事务消息 - 都有消息分组功能 ## 功能特性 - 高性能消息队列实现 - 支持文件和数据库两种存储方式 - 支持分区队列,提高并发处理能力 - 支持消息优先级 - 支持消息过期和死信队列 - 支持消息重试机制 - 提供Web监控界面 - 支持Spring Boot自动配置 ## 快速开始 ### 1. 添加依赖 在你的Spring Boot项目的`pom.xml`中添加以下依赖: ```xml com.example messagequeue-spring-boot-starter 1.0.0 ``` ### 2. 配置应用 在`application.properties`或`application.yml`中添加配置: ```properties # 启用消息队列 mq.enabled=true # 存储类型:file 或 database mq.store-type=file # 消息存储路径(当store-type为file时) mq.store-path=./mq_data # 启用Web监控界面 mq.enable-monitor=true # 监控界面路径 mq.monitor-path=/monitor # 是否启用异步刷盘 mq.async-flush=true # 异步刷盘间隔(毫秒) mq.async-flush-interval-ms=1000 # 消息重试次数 mq.max-retry-count=3 # 消息过期时间(毫秒),默认为24小时 mq.message-expire-time-ms=86400000 # 是否启用死信队列 mq.enable-dead-letter-queue=true # 死信队列名称 mq.dead-letter-queue-name=dead_letter # 高并发模式配置 mq.high-concurrency.enabled=true mq.high-concurrency.worker-threads=8 mq.high-concurrency.batch-size=100 mq.high-concurrency.batch-timeout-ms=1000 mq.high-concurrency.partition-count=4 ``` ### 3. 使用消息队列 在你的代码中注入`IMessageQueue`接口: ```java @Service public class MessageService { @Autowired private IMessageQueue messageQueue; public void sendMessage(String topic, String content) { Message message = new Message(topic, content); boolean success = messageQueue.sendMessage(message); if (success) { System.out.println("消息发送成功"); } else { System.out.println("消息发送失败"); } } public Message receiveMessage(String queueName) { return messageQueue.receiveMessage(queueName); } } ``` ### 4. 使用REST API 启动应用后,可以通过以下REST API发送和接收消息: - 发送消息:`POST /producer/send?topic=&content=` - 异步发送消息:`POST /producer/sendAsync?topic=&content=` - 批量发送消息:`POST /producer/sendBatch?topic=&content=&count=` - 发送优先级消息:`POST /producer/sendPriority?topic=&content=&priority=` - 接收消息:`GET /consumer/receive?queueName=` ### 5. 监控界面 如果启用了监控界面,可以通过浏览器访问`http://localhost:8080/monitor`查看消息队列的运行状态。 ## 高级配置 ### 数据库存储配置 如果选择使用数据库存储,需要配置数据源: ```properties # 数据源配置 spring.datasource.url=jdbc:mysql://localhost:3306/messagequeue spring.datasource.username=root spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver # JPA配置 spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect ``` ### 自定义配置 你也可以通过Java配置来自定义消息队列: ```java @Configuration public class MessageQueueConfig { @Bean @ConditionalOnMissingBean public MessageQueueProperties messageQueueProperties() { return new MessageQueueProperties(); } @Bean @ConditionalOnMissingBean public IMessageQueue messageQueue(MessageQueueProperties properties) { // 自定义消息队列实现 return new CustomMessageQueue(properties); } } ``` ## 示例项目 参考`mq-client-example`目录下的示例项目,了解如何在实际项目中使用MessageQueue Spring Boot Starter。 >>>>>>> origin/main