# pcdc
**Repository Path**: frostforest/pcdc
## Basic Information
- **Project Name**: pcdc
- **Description**: Poorman's Change Data Capture For MySQL
- **Primary Language**: Java
- **License**: MIT
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 8
- **Forks**: 0
- **Created**: 2025-07-12
- **Last Updated**: 2025-08-31
## Categories & Tags
**Categories**: dbmanager
**Tags**: cdc, MySQL, Java
## README
# PCDC-MySQL
[](https://opensource.org/licenses/MIT)
PCDC-MySQL是一个轻量级的MySQL变更数据捕获(CDC)工具,用于实时监控和处理数据库变更。它基于变更时间戳工作,定期扫描目标表的复合索引(updated_at,id),并将变更的数据传递给用户定义的处理器。
## 特性
- **可靠**:使用进度追踪表记录处理进度,确保数据变更不会遗漏,高度可靠;重启的时候可以从上一次的进度继续处理
- **并发安全**:可以在分布式环境使用,不需要额外加锁
- **秒级延迟**:秒级以上的延迟,低频变更的数据监控消耗可忽略不计;高负载且数据高频变更的情况下对TPS,QPS,平均响应时间,最大响应时间等关键性能指标影响不到2%[基准测试报告](bench_test.md)
- **轻量级**:无需额外的中间件或服务,仅依赖JDBC和SLF4J
- **低侵入性**:基于时间戳查询,不需要在数据变更的时候进行消息/事件发布
- **高效**:使用复合索引(updated_at,id)进行高效查询
- **灵活**:支持自定义数据处理和错误处理,初始装载和积压数据自动分区处理
## 要求
- Java 17+
- MySQL 8.0+
## 适用场景
中低频率变更数据,实时性要求不高的数据同步或监控场景。高负载且数据高频变更的情况下对TPS,QPS,平均响应时间,最大响应时间等关键性能指标影响不到2%。[基准测试报告](bench_test.md)
> 实测验证:高频变更且高负载情况,调整轮询间隔没有效果,主要是大量的数据加载影响TPS
> 低频变更的数据监控消耗可忽略不计;
## 使用前提
在开始使用PCDC之前,您需要确保以下先决条件已满足:
### 必要的数据库表
PCDC需要以下两个表才能正常工作:
#### 进度表
每个被监控的表都需要一个对应的进度表,用于记录处理进度。表名格式为`zzz_pcdc_progress_{your_table_name}`。例如,如果目标表名为"users",则进度表名为"zzz_pcdc_progress_users"。
```sql
CREATE TABLE IF NOT EXISTS `zzz_pcdc_progress_{your_table_name}` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增主键(唯一标识,必须前置)',
`upper_boundary` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'CDC时间分区上边界(分区不包含边界)',
`is_complete` tinyint NOT NULL DEFAULT '0' COMMENT '是否处理完成(复合索引首列,高频过滤)',
`lock_expire_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '锁过期时间(复合索引次列,与is_complete相邻)',
`worker_id` int NOT NULL DEFAULT '0' COMMENT '处理线程ID(最后的锁持有者)',
`error_counter` smallint NOT NULL DEFAULT '0' COMMENT '异常计数器(出错标记)',
`progress_datetime` datetime NOT NULL DEFAULT '1000-01-01 00:00:00' COMMENT '处理进度时间戳精确到秒',
`progress_id` bigint NOT NULL DEFAULT '0' COMMENT '处理进度ID',
`processed_qty` int NOT NULL DEFAULT '0' COMMENT '已处理数量',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_cdc_upper` (`upper_boundary`),
KEY `idx_cdc_is_complete` (`is_complete`,`lock_expire_time`,`upper_boundary`)
) ENGINE=InnoDB
ROW_FORMAT=COMPACT
AUTO_INCREMENT=1
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci
COMMENT='Change Data Capture Progress';
```
#### 序列表
PCDC还需要一个序列表:
```sql
CREATE TABLE IF NOT EXISTS `zzz_pcdc_sequence` (
`sequence` bigint NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`sequence`)
) ENGINE=InnoDB
DEFAULT CHARSET=ascii
ROW_FORMAT=COMPRESSED
COMMENT='这是一张序列表,用于分配worker_id';
```
> **注意**:请确保在启动PCDC之前创建这些表。表名中的`{your_table_name}`应替换为您要监控的实际表名。
### 数据库索引要求
- 目标表必须有复合索引(updated_at,id),否则PCDC将无法启动
- 目标表必须有数值型主键
- 目标表不能物理删除数据
- updated_at 字段最好完全由数据库维护:datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 记录变更时间戳
- 在应用层设置updated_at字段的值有一定风险:如果设置的时间早于数据库当前时间有可能导致捕获不到数据变更。
### PCDC 工作原理说明
#### 1. 时间分区规则
- `zzz_pcdc_progress_*` 表实际上表示数据分区和分区进度,按照 `throttleSeconds` 的值(假设为 30 秒),将时间线划分为连续的左闭右开区间(例如 `['2025-01-01 00:00:00', '2025-01-01 00:00:30')`)。
- 每个时间分区对应一条 progress 记录:
- `upper_boundary` 字段表示分区的上边界(不包含);
- 分区下边界为前一条记录的 `upper_boundary`。
- 每条 progress 记录使用 progress_datetime 和 progress_id 字段追踪处理进度;对应目标表复合索引(updated_at,id)
- 如果最近的时间分区没有数据更新,则不会写入progress。
#### 2. 数据匹配规则
被监控表中某条记录的 `updated_at` 字段若落在某时间分区内,PCDC 在处理对应 progress 记录时,会加载该数据并调用 `dataHandler` 进行处理。
#### 3. Master-Worker 工作模式
Master 线程定期(pollingIntervalSeconds)执行以下流程:
1. 检查最近的时间分区内是否有数据变更,通过(updated_at,id)索引查询max(updated_at),max(updated_at) > 最新的progress的upper_boundary则插入一条分区 progress 记录;
2. 检查是否有未完成的 progress 记录,若有则提交给 worker 线程池处理。
> 新生成的的upper_boundary不会超过数据库的当前时间,如果超过了就要等待下一个轮询周期到来才会插入新progress
#### 4. 并发与锁机制
- 每条 progress 记录同一时间仅由一个线程处理,通过 `lock_expire_time` 字段控制并发;
- 处理失败的分区会在 `lock_expire_time` 后重新处理,且不影响后续分区的正常处理。
#### 5. 处理效率与进度追踪
- 强制使用监控表的 `(updated_at, id)` 索引(`FORCE INDEX`),提升处理效率;
- 通过 `progress_datetime` 和 `progress_id` 字段追踪处理进度。
#### 6. 特殊场景说明
- **数据更新跨分区**:若记录在 progress 处理期间二次更新,导致 `updated_at` 超过当前分区 `upper_boundary`,则由后续分区处理;
- **处理顺序**:不保证处理顺序:
- 当 worker 线程数 > 1 时,不同时间分区可并行处理;
- 即使 worker 线程数为 1,由于失败重试(如某个分区处理失败后,后续分区先处理,失败分区在锁过期后重新加入处理队列),最终处理顺序也可能与分区的时间先后顺序不一致。
#### 7. 时序图
```mermaid
sequenceDiagram
participant Master线程
participant Worker线程池
participant progress表
participant 被监控表
loop 定期执行(pollingIntervalSeconds)
Master线程->>被监控表: 检查最近时间分区是否有数据变更
(通过(updated_at,id)索引查询max(updated_at))
alt max(updated_at) > 上一个progress的upper_boundary
Master线程->>progress表: 插入progress记录(状态:未完成,错误计数:0,lock_expire_time:初始值)
end
Master线程->>progress表: 扫描未完成的progress记录
(按upper_boundary升序排序)
Master线程->>Worker线程池: 顺序提交progress到worker线程池
Worker线程池->>progress表: 获取锁(更新lock_expire_time为当前时间+超时值)
Worker线程池->>被监控表: 加载被监控表中updated_at在当前时间分区内的记录
(强制使用(updated_at,id)索引)
Worker线程池->>Worker线程池: 调用dataHandler处理数据
alt 处理成功
Worker线程池->>progress表: 1. 设置状态为完成
2. 错误计数清零
3. 更新最终progress进度
else 处理失败
Worker线程池->>Worker线程池: 调用errorHandler处理异常
Worker线程池->>progress表: 1. 保持状态为未完成
2. 错误计数 = 原计数 + 1
3. 不修改lock_expire_time
end
end
Note over Master线程,progress表: 未完成记录在lock_expire_time到期前
不会被Master扫描,到期后可被Master重新提交给worker线程池处理
Note over Master线程,Worker线程池: 即使按upper_boundary升序提交,因失败重试机制
实际处理顺序仍可能与时间分区顺序不一致
```
## 安装
### Maven
```xml
org.zzz
pcdc-mysql
1.0.0
```
### Gradle
```groovy
implementation 'org.zzz:pcdc-mysql:1.0.0'
```
## 使用方法
### 基本用法
```java
// 创建数据源
DataSource dataSource = createDataSource();
// 创建数据处理器
PcdcDataHandler dataHandler = (tableConfig, records) -> {
// 处理变更的数据
List