# notify-service **Repository Path**: bbua_admin/notify-service ## Basic Information - **Project Name**: notify-service - **Description**: api 通知 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-07 - **Last Updated**: 2026-01-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # API通知系统 基于 Spring Boot 的异步通知服务,使用 Kafka 作为消息队列,实现任务的顺序判断、重试机制等功能。 ## 📋 设计文档 ### 一、需求分析 1. **对内标准化**:提供统一的内部 HTTP API 接口,让内部各业务系统无需关注外部发送细节,仅需传入「目标地址 + head + 请求内容」即可发起通知。(注意:外部系统的签名验证,业务系统自己实现) 2. **对外可靠性**:系统需保障通知尽可能可靠送达外部,需解决网络波动、目标服务不可用等场景下的发送失败问题,引入重试机制和报警机制 3. **弹性能力**:针对单位时间内大量并发通知的场景,需要从异步削峰、资源弹性等维度增强系统弹性,确保在流量洪峰下系统不崩溃、通知不丢失、发送不积压 4. **数据顺序性**:在 API 通知系统中,保证同一业务主体(如同一用户、同一订单)的多条通知,需按照业务系统调用 API 的顺序被发送到外部目标地址,避免因异步处理、并发执行导致的顺序错乱 5. **任务追踪**:全链路记录通知任务的生命周期轨迹,支持问题排查、状态监控、审计回溯。核心目标是让每个通知任务从「创建→入队→发送→回调/结果」的全流程可追溯、可查询、可分析 ### 二、整体架构与核心设计 #### 架构图 ![输入图片说明](src/main/resources/architecture-diagram.png) #### 核心模块设计 1. **请求拦截层** - 核心能力:生成全局唯一 traceId(贯穿全链路)、记录请求时间/调用方 IP 等基础日志 - 作用:为全链路追踪提供唯一标识,统一请求元数据采集 2. **接入层(Controller)** - 核心能力:提供标准化内部 HTTP API(接收目标地址、Header、请求内容、业务标识) - 入参规范: ```json { "targetUrl": "外部API地址", "headers": { "Content-Type": "application/json", "Sign": "业务系统生成的签名" }, "body": "请求内容", "businessKey": "业务标识(如用户ID/订单ID)" } ``` - 出参规范:返回 taskId 和 traceId,便于业务系统查询状态 3. **服务层(Service)** - 核心能力: 1. 接收接入层请求,将任务封装为消息 2. 发送消息到主消息队列(异步削峰,避免业务系统阻塞) 3. 保证「任务入队」与「基础信息入库」的原子性(避免消息丢失) 4. **消息队列层(主队列 + 重试队列)** - 主消息队列:承接正常待发送任务,实现异步削峰 - 重试队列:存放发送失败的任务,支持指数退避重试(如 5s→10s→20s) - 设计逻辑:主队列负责高吞吐,重试队列隔离失败任务,避免影响正常任务 5. **单调性判断层** - 核心能力:基于 businessKey 判断任务顺序(如记录同一业务主体的最新任务时间/ID) - 作用:保证同一业务主体的通知按调用顺序发送,避免「支付通知」早于「订单创建通知」的错乱 6. **HTTP 发送层** - 核心能力:根据任务中的 targetUrl、headers、body 调用外部 API - 附加逻辑:记录发送耗时、外部服务响应结果/异常信息 7. **数据持久化 + 监控告警** - 数据库:存储任务状态(成功/失败)、失败原因、traceId、重试次数等 - 监控告警:失败次数超过阈值时,触发日志打印 + 监控告警(如邮件/短信通知运维) ### 三、关键工程决策与取舍说明 | 决策点 | 选择方案 | 取舍原因 | | :----- | :------- | :------- | | 外部签名处理 | 由业务系统自行实现 | 外部 API 签名规则因供应商而异,系统不感知业务逻辑,由业务系统自主生成更灵活,与业务解耦 | | 消息队列选型 | 主队列 + 重试队列分离 | 避免失败任务阻塞主队列,重试队列独立控制重试策略,兼顾吞吐与可靠性 | | 顺序性保证 | 消费端"单调性判断" | 相比"生产端分区",消费端判断更灵活,且无需提前定义分区规则 | | 重试策略 | 指数退避 + 次数限制(默认 3 次) | 避免高频重试压垮外部服务,次数限制防止任务无限重试 | | 持久化时机 | 发送前基础信息入库 + 发送后更新状态 | 既保证任务可追溯,又避免"发送成功但状态未更新"的不一致 | | 幂等性 | 由业务系统自行实现 | 结合需求要求,幂等性由业务系统自行实现,本系统仅提供「辅助能力」,不介入具体幂等逻辑,与业务完全解耦 | ### 四、AI 使用说明 #### 1. AI 提供的关键帮助 - 架构梳理:将提供的架构图和流程图转化为标准化架构描述,明确各模块依赖关系 - 代码模板生成:快速生成 Controller、Service、Consumer 等核心代码框架,节省基础编码时间 - 技术选型建议:推荐「主队列 + 重试队列」的消息队列方案,匹配可靠性与弹性需求 - 顺序性实现思路:提供「单调性判断」的简化逻辑,解决同一业务主体的顺序问题 #### 2. 未采纳的 AI 建议 - 引入分布式事务:AI 建议用 Seata 保证「入库 + 发消息」的一致性,但 MVP 阶段采用「先入库后发消息」的本地事务,降低复杂度 - 外部签名统一处理:AI 建议系统接管外部签名,但不同供应商签名规则差异大,由业务系统自行处理更灵活 - 接入 SkyWalking 链路追踪:AI 建议集成分布式链路系统,但 MVP 阶段用 MDC+traceId 已满足基础追踪需求 #### 3. 自主决策及原因 - 外部签名由业务系统处理:系统不感知外部 API 的业务规则,避免耦合供应商逻辑 - 重试队列复用主队列消费逻辑:减少代码冗余,统一顺序判断逻辑 - 单调性判断简化实现:MVP 阶段用「最新任务 ID」判断顺序,避免引入复杂的分布式锁/状态机,降低实现成本 ## 📋 功能特性 - ✅ 任务入库持久化 - ✅ 基于 Kafka 的异步消息处理 - ✅ 业务键(businessKey)单调性判断 - ✅ 自动重试机制(最多3次) - ✅ 任务状态跟踪(PENDING/SUCCESS/FAILED) - ✅ 分布式追踪支持(traceId) ## 🏗️ 项目结构 ```text src/main/java/com/example/notify/ ├── NotifyApplication.java # 启动类 ├── config/ │ └── KafkaConfig.java # Kafka配置 ├── controller/ │ └── NotifyController.java # 接入层 ├── service/ │ └── NotifyService.java # 服务层 ├── consumer/ │ └── NotifyConsumer.java # 消费层 ├── entity/ │ └── NotifyTask.java # 任务实体 ├── repository/ │ └── NotifyTaskRepository.java # 数据访问层 └── dto/ ├── NotifyRequest.java # 请求DTO └── NotifyResponse.java # 响应DTO ``` ## 🚀 快速开始 ### 1. 前置要求 - JDK 17+ - Maven 3.6+ - Kafka 2.8+(需要先启动 Kafka 服务) ### 2. 启动 Kafka ```bash # 使用 Docker 启动 Kafka(推荐) docker-compose up -d # 或使用本地安装的 Kafka # 启动 Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动 Kafka bin/kafka-server-start.sh config/server.properties ``` ### 3. 创建 Kafka Topic ```bash # 创建主队列 Topic kafka-topics.sh --create --topic notify-main-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 # 创建重试队列 Topic kafka-topics.sh --create --topic notify-retry-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` ### 4. 启动应用 ```bash mvn spring-boot:run ``` ## 📡 API 接口 ### 发送通知 ```bash POST /api/notify Content-Type: application/json { "targetUrl": "https://example.com/api/callback", "headers": { "Content-Type": "application/json", "Authorization": "Bearer token123" }, "body": "{\"message\": \"test\"}", "businessKey": "order-123" } ``` **响应:** ```json { "taskId": "550e8400-e29b-41d4-a716-446655440000", "traceId": "trace-123", "message": "任务已接收" } ``` ### 查询任务状态 ```bash GET /api/notify/{taskId} ``` ## 🔄 工作流程 1. **接收请求**:Controller 接收通知请求 2. **发送消息**:将任务发送到 Kafka 主队列(`notify-main-topic`) 3. **消费处理**:Consumer 从主队列消费消息 4. **顺序判断**:检查是否为同一 businessKey 的最新任务 5. **发送通知**:调用外部 API 6. **结果处理**: - 成功:更新状态为 SUCCESS - 失败:增加重试次数,发送到重试队列(`notify-retry-topic`) 7. **重试机制**:重试队列消费后转发到主队列,最多重试3次 ## ⚙️ 配置说明 ### Kafka 配置 在 `application.properties` 中配置: ```properties spring.kafka.bootstrap-servers=localhost:9092 ``` ### 数据库配置 使用 H2 内存数据库(开发环境),生产环境可替换为 MySQL/PostgreSQL: ```properties spring.datasource.url=jdbc:h2:mem:notifydb ``` ## 🔍 关键实现 ### 1. 单调性判断 通过查询同一 `businessKey` 的最新任务ID,确保只处理最新任务: ```java String latestTaskId = taskRepo.findLatestTaskIdByBusinessKey(task.getBusinessKey()); if (!task.getTaskId().equals(latestTaskId)) { // 非最新任务,跳过处理 return; } ``` ### 2. 重试机制 失败任务自动发送到重试队列,重试队列消费后转发到主队列: ```java if (task.getRetryCount() > 3) { task.setStatus("FAILED"); } else { kafkaTemplate.send("notify-retry-topic", task.getBusinessKey(), taskJson); } ``` ### 3. 手动确认 使用手动确认模式,确保消息处理完成后再确认: ```java @KafkaListener(topics = "notify-main-topic", groupId = "notify-group") public void consumeMainQueue(@Payload String message, Acknowledgment acknowledgment) { // 处理消息 acknowledgment.acknowledge(); } ``` ## 🧪 测试 ### 使用 curl 测试 ```bash # 发送通知 curl -X POST http://localhost:8080/api/notify \ -H "Content-Type: application/json" \ -d '{ "targetUrl": "https://httpbin.org/post", "headers": { "Content-Type": "application/json" }, "body": "{\"test\": \"message\"}", "businessKey": "test-001" }' # 查询任务状态 curl http://localhost:8080/api/notify/{taskId} ``` ## 📊 监控 ### H2 控制台 访问 `http://localhost:8080/h2-console` 查看数据库: - JDBC URL: `jdbc:h2:mem:notifydb` - Username: `sa` - Password: (空) ### 查看 Kafka 消息 ```bash # 消费主队列消息 kafka-console-consumer.sh --topic notify-main-topic --bootstrap-server localhost:9092 --from-beginning # 消费重试队列消息 kafka-console-consumer.sh --topic notify-retry-topic --bootstrap-server localhost:9092 --from-beginning ``` ## 📝 注意事项 1. **Kafka 必须启动**:应用启动前需要先启动 Kafka 2. **Topic 需要创建**:首次运行需要创建 Topic 3. **手动确认**:使用手动确认模式,确保消息处理完成 4. **业务键设计**:`businessKey` 的设计要合理,确保同一业务使用相同的 key 5. **重试次数**:当前配置最多重试3次,可根据需求调整 ## 🚀 生产环境建议 1. **数据库**:替换 H2 为 MySQL/PostgreSQL 2. **连接池**:配置数据库连接池 3. **监控**:集成 Prometheus + Grafana 4. **日志**:集成 ELK 或类似日志系统 5. **Kafka 集群**:使用 Kafka 集群提高可用性 6. **事务**:考虑使用 Kafka 事务保证数据一致性