# pcdc **Repository Path**: frostforest/pcdc ## Basic Information - **Project Name**: pcdc - **Description**: Poorman's Change Data Capture For MySQL - **Primary Language**: Java - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 8 - **Forks**: 0 - **Created**: 2025-07-12 - **Last Updated**: 2025-08-31 ## Categories & Tags **Categories**: dbmanager **Tags**: cdc, MySQL, Java ## README # PCDC-MySQL [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) PCDC-MySQL是一个轻量级的MySQL变更数据捕获(CDC)工具,用于实时监控和处理数据库变更。它基于变更时间戳工作,定期扫描目标表的复合索引(updated_at,id),并将变更的数据传递给用户定义的处理器。 ## 特性 - **可靠**:使用进度追踪表记录处理进度,确保数据变更不会遗漏,高度可靠;重启的时候可以从上一次的进度继续处理 - **并发安全**:可以在分布式环境使用,不需要额外加锁 - **秒级延迟**:秒级以上的延迟,低频变更的数据监控消耗可忽略不计;高负载且数据高频变更的情况下对TPS,QPS,平均响应时间,最大响应时间等关键性能指标影响不到2%[基准测试报告](bench_test.md) - **轻量级**:无需额外的中间件或服务,仅依赖JDBC和SLF4J - **低侵入性**:基于时间戳查询,不需要在数据变更的时候进行消息/事件发布 - **高效**:使用复合索引(updated_at,id)进行高效查询 - **灵活**:支持自定义数据处理和错误处理,初始装载和积压数据自动分区处理 ## 要求 - Java 17+ - MySQL 8.0+ ## 适用场景 中低频率变更数据,实时性要求不高的数据同步或监控场景。高负载且数据高频变更的情况下对TPS,QPS,平均响应时间,最大响应时间等关键性能指标影响不到2%。[基准测试报告](bench_test.md) > 实测验证:高频变更且高负载情况,调整轮询间隔没有效果,主要是大量的数据加载影响TPS > 低频变更的数据监控消耗可忽略不计; ## 使用前提 在开始使用PCDC之前,您需要确保以下先决条件已满足: ### 必要的数据库表 PCDC需要以下两个表才能正常工作: #### 进度表 每个被监控的表都需要一个对应的进度表,用于记录处理进度。表名格式为`zzz_pcdc_progress_{your_table_name}`。例如,如果目标表名为"users",则进度表名为"zzz_pcdc_progress_users"。 ```sql CREATE TABLE IF NOT EXISTS `zzz_pcdc_progress_{your_table_name}` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增主键(唯一标识,必须前置)', `upper_boundary` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'CDC时间分区上边界(分区不包含边界)', `is_complete` tinyint NOT NULL DEFAULT '0' COMMENT '是否处理完成(复合索引首列,高频过滤)', `lock_expire_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '锁过期时间(复合索引次列,与is_complete相邻)', `worker_id` int NOT NULL DEFAULT '0' COMMENT '处理线程ID(最后的锁持有者)', `error_counter` smallint NOT NULL DEFAULT '0' COMMENT '异常计数器(出错标记)', `progress_datetime` datetime NOT NULL DEFAULT '1000-01-01 00:00:00' COMMENT '处理进度时间戳精确到秒', `progress_id` bigint NOT NULL DEFAULT '0' COMMENT '处理进度ID', `processed_qty` int NOT NULL DEFAULT '0' COMMENT '已处理数量', `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_cdc_upper` (`upper_boundary`), KEY `idx_cdc_is_complete` (`is_complete`,`lock_expire_time`,`upper_boundary`) ) ENGINE=InnoDB ROW_FORMAT=COMPACT AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Change Data Capture Progress'; ``` #### 序列表 PCDC还需要一个序列表: ```sql CREATE TABLE IF NOT EXISTS `zzz_pcdc_sequence` ( `sequence` bigint NOT NULL AUTO_INCREMENT, PRIMARY KEY (`sequence`) ) ENGINE=InnoDB DEFAULT CHARSET=ascii ROW_FORMAT=COMPRESSED COMMENT='这是一张序列表,用于分配worker_id'; ``` > **注意**:请确保在启动PCDC之前创建这些表。表名中的`{your_table_name}`应替换为您要监控的实际表名。 ### 数据库索引要求 - 目标表必须有复合索引(updated_at,id),否则PCDC将无法启动 - 目标表必须有数值型主键 - 目标表不能物理删除数据 - updated_at 字段最好完全由数据库维护:datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 记录变更时间戳 - 在应用层设置updated_at字段的值有一定风险:如果设置的时间早于数据库当前时间有可能导致捕获不到数据变更。 ### PCDC 工作原理说明 #### 1. 时间分区规则 - `zzz_pcdc_progress_*` 表实际上表示数据分区和分区进度,按照 `throttleSeconds` 的值(假设为 30 秒),将时间线划分为连续的左闭右开区间(例如 `['2025-01-01 00:00:00', '2025-01-01 00:00:30')`)。 - 每个时间分区对应一条 progress 记录: - `upper_boundary` 字段表示分区的上边界(不包含); - 分区下边界为前一条记录的 `upper_boundary`。 - 每条 progress 记录使用 progress_datetime 和 progress_id 字段追踪处理进度;对应目标表复合索引(updated_at,id) - 如果最近的时间分区没有数据更新,则不会写入progress。 #### 2. 数据匹配规则 被监控表中某条记录的 `updated_at` 字段若落在某时间分区内,PCDC 在处理对应 progress 记录时,会加载该数据并调用 `dataHandler` 进行处理。 #### 3. Master-Worker 工作模式 Master 线程定期(pollingIntervalSeconds)执行以下流程: 1. 检查最近的时间分区内是否有数据变更,通过(updated_at,id)索引查询max(updated_at),max(updated_at) > 最新的progress的upper_boundary则插入一条分区 progress 记录; 2. 检查是否有未完成的 progress 记录,若有则提交给 worker 线程池处理。 > 新生成的的upper_boundary不会超过数据库的当前时间,如果超过了就要等待下一个轮询周期到来才会插入新progress #### 4. 并发与锁机制 - 每条 progress 记录同一时间仅由一个线程处理,通过 `lock_expire_time` 字段控制并发; - 处理失败的分区会在 `lock_expire_time` 后重新处理,且不影响后续分区的正常处理。 #### 5. 处理效率与进度追踪 - 强制使用监控表的 `(updated_at, id)` 索引(`FORCE INDEX`),提升处理效率; - 通过 `progress_datetime` 和 `progress_id` 字段追踪处理进度。 #### 6. 特殊场景说明 - **数据更新跨分区**:若记录在 progress 处理期间二次更新,导致 `updated_at` 超过当前分区 `upper_boundary`,则由后续分区处理; - **处理顺序**:不保证处理顺序: - 当 worker 线程数 > 1 时,不同时间分区可并行处理; - 即使 worker 线程数为 1,由于失败重试(如某个分区处理失败后,后续分区先处理,失败分区在锁过期后重新加入处理队列),最终处理顺序也可能与分区的时间先后顺序不一致。 #### 7. 时序图 ```mermaid sequenceDiagram participant Master线程 participant Worker线程池 participant progress表 participant 被监控表 loop 定期执行(pollingIntervalSeconds) Master线程->>被监控表: 检查最近时间分区是否有数据变更
(通过(updated_at,id)索引查询max(updated_at)) alt max(updated_at) > 上一个progress的upper_boundary Master线程->>progress表: 插入progress记录(状态:未完成,错误计数:0,lock_expire_time:初始值) end Master线程->>progress表: 扫描未完成的progress记录
(按upper_boundary升序排序) Master线程->>Worker线程池: 顺序提交progress到worker线程池 Worker线程池->>progress表: 获取锁(更新lock_expire_time为当前时间+超时值) Worker线程池->>被监控表: 加载被监控表中updated_at在当前时间分区内的记录
(强制使用(updated_at,id)索引) Worker线程池->>Worker线程池: 调用dataHandler处理数据 alt 处理成功 Worker线程池->>progress表: 1. 设置状态为完成
2. 错误计数清零
3. 更新最终progress进度 else 处理失败 Worker线程池->>Worker线程池: 调用errorHandler处理异常 Worker线程池->>progress表: 1. 保持状态为未完成
2. 错误计数 = 原计数 + 1
3. 不修改lock_expire_time end end Note over Master线程,progress表: 未完成记录在lock_expire_time到期前
不会被Master扫描,到期后可被Master重新提交给worker线程池处理 Note over Master线程,Worker线程池: 即使按upper_boundary升序提交,因失败重试机制
实际处理顺序仍可能与时间分区顺序不一致 ``` ## 安装 ### Maven ```xml org.zzz pcdc-mysql 1.0.0 ``` ### Gradle ```groovy implementation 'org.zzz:pcdc-mysql:1.0.0' ``` ## 使用方法 ### 基本用法 ```java // 创建数据源 DataSource dataSource = createDataSource(); // 创建数据处理器 PcdcDataHandler dataHandler = (tableConfig, records) -> { // 处理变更的数据 List idList = records.stream() .map(m -> m.get(tableConfig.getIdColumnName())) .toList(); System.out.println("处理更新的数据: " + idList); }; // 创建错误处理器 PcdcErrorHandler errorHandler = new PcdcErrorHandler() { @Override public void onError(PcdcTableConfig tableConfig, List> data, Exception e) { List idList = data.stream() .map(m -> m.get(tableConfig.getIdColumnName())) .toList(); System.err.println("处理数据出错: " + tableConfig.getCdcTableName() + " id= " + idList); e.printStackTrace(); } }; // 创建并配置PCDC Pcdc pcdc = new PcdcBuilder(dataSource, "your_table") .idColumn("id") .updatedAtColumn("updated_at") .pollingIntervalSeconds(30) .throttleSeconds(30) .batchWorkSize(1000) .batchWorkTimeoutSeconds(300) .partitionThreshold(10000) .workerPoolSize(5) .dataHandler(dataHandler) .errorHandler(errorHandler) .build(); // 启动PCDC pcdc.startup(); // 在应用程序关闭时停止PCDC Runtime.getRuntime().addShutdownHook(new Thread(pcdc::shutdown)); ``` ### Spring Boot集成 ```java @Configuration public class PcdcConfig { @Bean(initMethod = "startup", destroyMethod = "shutdown") public Pcdc pcdc(DataSource dataSource) { // 创建数据处理器 PcdcDataHandler dataHandler = (tableConfig, records) -> { // 处理变更的数据 List idList = records.stream() .map(m -> m.get(tableConfig.getIdColumnName())) .toList(); System.out.println("处理更新的数据: " + idList); }; // 创建错误处理器 PcdcErrorHandler errorHandler = new PcdcErrorHandler() { @Override public void onError(PcdcTableConfig tableConfig, List> data, Exception e) { List idList = data.stream() .map(m -> m.get(tableConfig.getIdColumnName())) .toList(); System.err.println("处理数据出错: " + tableConfig.getCdcTableName() + " id= " + idList); e.printStackTrace(); } }; return new PcdcBuilder(dataSource, "your_table") .dataHandler(dataHandler) .errorHandler(errorHandler) .build(); } } ``` ## 配置选项 PCDC提供了多种配置选项,可以通过PcdcBuilder进行设置: | 方法 | 描述 | 默认值 | |------|------|--------| | `PcdcBuilder(DataSource, String)` | 构造函数,设置数据源和目标表名 | 必需 | | `idColumn(String)` | 设置ID列名 | "id" | | `updatedAtColumn(String)` | 设置更新时间戳列名 | "updated_at" | | `pollingIntervalSeconds(int)` | 设置轮询间隔(秒)(最小值10,应大于等于throttleSeconds) | 30 | | `throttleSeconds(int)` | 设置节流秒数(最小值10) | 30 | | `batchWorkSize(int)` | 设置批处理大小 | 1000 | | `batchWorkTimeoutSeconds(int)` | 设置批处理超时时间(秒) | 300 | | `partitionThreshold(int)` | 设置分区阈值 | 10000 | | `workerPoolSize(int)` | 设置工作线程池大小 | 5 | | `dataHandler(PcdcDataHandler)` | 设置数据处理器 | 必需 | | `errorHandler(PcdcErrorHandler)` | 设置错误处理器 | 必需 | ## 示例 ### 完整示例 ```java @Configuration public class PcdcConfig { private static final Logger logger = LoggerFactory.getLogger(PcdcConfig.class); @Bean(initMethod = "startup", destroyMethod = "shutdown") public Pcdc userTableWatcher(DataSource dataSource) { // 创建数据处理器 PcdcDataHandler dataHandler = (tableConfig, records) -> { for (Map record : records) { // 处理用户数据变更 Long userId = (Long) record.get("id"); String name = (String) record.get("name"); String email = (String) record.get("email"); System.out.println("User updated: " + userId + ", " + name + ", " + email); // 执行其他业务逻辑 // ... } }; // 创建错误处理器 PcdcErrorHandler errorHandler = new PcdcErrorHandler() { @Override public void onError(PcdcTableConfig tableConfig, List> data, Exception e) { // 记录错误 logger.error("Error processing user data change for table: " + tableConfig.getCdcTableName(), e); } }; return new PcdcBuilder(dataSource, "users") .idColumn("id") .updatedAtColumn("updated_at") .pollingIntervalSeconds(30) .throttleSeconds(30) .batchWorkSize(500) .batchWorkTimeoutSeconds(300) .partitionThreshold(10000) .workerPoolSize(5) .dataHandler(dataHandler) .errorHandler(errorHandler) .build(); } } ``` ### 多表监控 ```java @Configuration public class MultiTablePcdcConfig { private static final Logger logger = LoggerFactory.getLogger(MultiTablePcdcConfig.class); @Bean(initMethod = "startup", destroyMethod = "shutdown") public Pcdc userTableWatcher(DataSource dataSource) { return new PcdcBuilder(dataSource, "users") .dataHandler(this::processUserData) .errorHandler(this::handleError) .build(); } @Bean(initMethod = "startup", destroyMethod = "shutdown") public Pcdc orderTableWatcher(DataSource dataSource) { return new PcdcBuilder(dataSource, "orders") .dataHandler(this::processOrderData) .errorHandler(this::handleError) .build(); } private void processUserData(PcdcTableConfig tableConfig, List> records) { // 处理用户数据 for (Map record : records) { Long userId = (Long) record.get("id"); // 处理用户数据... System.out.println("处理用户数据: " + userId); } } private void processOrderData(PcdcTableConfig tableConfig, List> records) { // 处理订单数据 for (Map record : records) { Long orderId = (Long) record.get("id"); // 处理订单数据... System.out.println("处理订单数据: " + orderId); } } private void handleError(PcdcTableConfig tableConfig, List> data, Exception e) { // 处理错误 logger.error("Error in PCDC for table: " + tableConfig.getCdcTableName(), e); } } ``` ## 注意事项 1. **时间戳列**:默认情况下,PCDC使用名为"updated_at"的列作为时间戳列,可以通过`updatedAtColumnName`方法进行配置。 2. **数据处理**:数据处理器应该是幂等的,因为同一条数据可能会被多次处理。 3. **错误处理**:错误处理器应该妥善处理异常,以确保PCDC能够继续运行。 4. **配置建议**:`pollingIntervalSeconds` 应该大于等于 `throttleSeconds`。 ======= ## 许可证 PCDC-MySQL使用MIT许可证。详情请参阅[LICENSE](pcdc-mysql/LICENSE)文件。