TiCDC同步数据到Kafka原创
# TiCDC同步数据到Kafka
# 一、TiCDC简介
TiCDC (TiDB Change Data Capture) 是TiDB官方提供的数据变更捕获工具,用于将TiDB集群的增量数据变更实时同步到下游系统。它具有以下特点:
- 低延迟:毫秒级别的同步延迟
- 高可用:多节点部署,自动故障转移
- 水平扩展:可通过增加节点提升同步能力
- 集成简单:无需修改TiDB集群配置
- 多种输出格式:支持Canal-JSON、Avro、Maxwell等格式
# 二、部署准备
# 2.1 环境要求
- TiDB 版本 ≥ 4.0.0
- 确保TiCDC服务器与TiDB集群、Kafka集群网络互通
- 推荐TiCDC节点配置:8核CPU,16GB内存,SSD磁盘
# 2.2 安装TiCDC
TiCDC组件通常随TiDB集群一起部署,可通过TiUP工具进行管理:
# 使用TiUP部署包含TiCDC组件的TiDB集群
tiup cluster deploy tidb-test v5.4.0 ./topology.yaml --user tidb
# 向现有集群扩容TiCDC节点
tiup cluster scale-out tidb-test ./scale-out.yaml --user tidb
1
2
3
4
5
2
3
4
5
scale-out.yaml示例:
cdc_servers:
- host: 10.0.1.20
port: 8300
deploy_dir: "/tidb-deploy/cdc-8300"
data_dir: "/tidb-data/cdc-8300"
log_dir: "/tidb-deploy/cdc-8300/log"
1
2
3
4
5
6
2
3
4
5
6
# 三、配置文件详解
TiCDC同步任务需要创建配置文件(例如tidb2kafka.toml
),用于定义同步行为:
# 是否区分大小写
case-sensitive = true
# 启用老值存储,用于输出变更前的数据(用于UPDATE事件)
enable-old-value = true
# 同步任务的启动模式
# 可选值: "normal"(默认), "offline"(离线模式)
mode = "normal"
# 过滤器配置
[filter]
# 忽略特定事务的起始时间戳
ignore-txn-start-ts = [1, 2]
# 需要同步的表规则列表
# 支持正则表达式,例如 'test.*' 表示同步test库下所有表
rules = [
'my_db.table1',
'my_db.table2',
'my_db.table3'
]
# 解析器配置
[mounter]
# 解析器工作线程数
worker-num = 16
# 调度器配置
[[sink.dispatchers]]
# 匹配规则,决定哪些表使用此调度策略
matcher = [
'my_db.table1',
'my_db.table2',
'my_db.table3'
]
# 调度策略:
# - default: 默认策略,确保事务完整性
# - ts: 按时间戳调度,提高并发性能
# - rowid: 按行ID调度,适用于有主键的表
# - table: 按表调度,每个表一个分区
dispatcher = 'ts'
# Sink 配置
[sink]
# 输出数据格式:
# - canal-json: 阿里Canal格式
# - avro: Avro格式,需配合Schema Registry
# - maxwell: Maxwell格式
# - open-protocol: TiCDC开放协议
protocol = "canal-json"
# 失败重试配置
[sink.retry]
# 最大重试次数,0表示无限重试
max-retry-times = 3
# 重试间隔(毫秒)
retry-interval = 1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 四、常用操作命令
# 4.1 创建同步任务
cdc cli changefeed create \
--pd="http://10.10.10.105:2379" \
--sink-uri="kafka://10.10.10.231:9092/my_topic?protocol=canal-json&kafka-version=2.6.0&partition-num=10&max-message-bytes=67108864&replication-factor=3" \
--changefeed-id="tidb-to-kafka" \
--config="tidb2kafka.toml" \
--start-ts="433686579570032640"
1
2
3
4
5
6
2
3
4
5
6
参数详解:
--pd
: PD集群地址,支持多地址用逗号分隔--sink-uri
: Kafka连接URI,包含以下参数:protocol
: 输出格式,如canal-jsonkafka-version
: Kafka版本partition-num
: Kafka主题分区数max-message-bytes
: 单条消息最大字节数replication-factor
: 副本因子ca
: CA证书路径(SSL连接时使用)cert
: 客户端证书路径(SSL连接时使用)key
: 客户端密钥路径(SSL连接时使用)
--changefeed-id
: 同步任务ID,集群内唯一--config
: 配置文件路径--start-ts
: 同步起始时间戳,可通过tidb-binlog
工具获取,0
表示从当前时间开始
# 4.2 任务管理命令
# 查看所有同步任务
cdc cli changefeed list --pd="http://10.10.10.105:2379"
# 查询特定任务状态
cdc cli changefeed query --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"
# 暂停同步任务
cdc cli changefeed pause --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"
# 恢复同步任务
cdc cli changefeed resume --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"
# 更新任务配置
cdc cli changefeed update \
--pd="http://10.10.10.105:2379" \
--changefeed-id="tidb-to-kafka" \
--config="updated_config.toml"
# 删除同步任务
cdc cli changefeed remove --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"
# 查看TiCDC节点状态
cdc cli capture list --pd="http://10.10.10.105:2379"
# 查看任务处理器状态
cdc cli processor list --pd="http://10.10.10.105:2379"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 五、数据格式示例
使用canal-json格式时,Kafka中的消息结构如下:
{
"id": 123,
"database": "my_db",
"table": "my_table",
"pkNames": ["id"],
"isDdl": false,
"type": "UPDATE",
"es": 1638352903000,
"ts": 1638352903123,
"sql": "",
"sqlType": {"id": 4, "name": 12, "age": 4},
"data": [{"id": "1", "name": "张三", "age": "25"}],
"old": [{"id": "1", "name": "张三", "age": "24"}]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
字段说明:
id
: 消息唯一IDdatabase
: 数据库名table
: 表名pkNames
: 主键列名isDdl
: 是否为DDL操作type
: 操作类型(INSERT/UPDATE/DELETE)es
: 事件时间戳(毫秒)ts
: 处理时间戳(毫秒)sql
: DDL语句(仅DDL事件)sqlType
: 列数据类型data
: 变更后数据old
: 变更前数据(仅UPDATE事件且enable-old-value=true时)
# 六、性能调优
# 6.1 TiCDC参数调优
# 提高并行度
[mounter]
worker-num = 32
# 使用表级分发提高并行度
[[sink.dispatchers]]
dispatcher = "table"
# 调整批处理大小
[sink]
flush-interval = 500 # 毫秒
max-batch-size = 1024 # 条数
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 6.2 Kafka参数调优
- 增加分区数提高并行处理能力
- 调整副本因子平衡可靠性和性能
- 配置合适的消息大小限制
# 七、常见问题与解决方案
# 7.1 同步延迟高
可能原因:
- TiCDC节点资源不足
- Kafka集群负载高
- 网络带宽限制
解决方案:
- 增加TiCDC节点数量
- 优化配置文件中的worker-num参数
- 检查网络连接质量
# 7.2 数据丢失
可能原因:
- TiCDC节点异常退出
- Kafka集群不可用
解决方案:
- 设置合适的Kafka副本因子(至少3)
- 启用Kafka的acks=all配置
- 定期检查同步状态和监控告警
# 7.3 同步任务卡住
可能原因:
- 大事务处理
- PD节点异常
解决方案:
- 检查TiCDC日志
- 使用
cdc cli changefeed query
查看任务状态 - 必要时重启同步任务
# 八、最佳实践
- 合理规划资源:TiCDC节点数量应与需要同步的数据量相匹配
- 监控同步延迟:设置监控和告警,及时发现同步问题
- 定期检查状态:使用
cdc cli changefeed query
命令检查同步状态 - 增量同步策略:生产环境中指定合适的
start-ts
,避免全量同步 - 表结构变更:DDL操作前暂停同步任务,操作完成后恢复
- 安全连接:生产环境使用SSL加密连接Kafka集群
- 定期备份配置:保存同步任务配置,便于灾难恢复
上次更新: 4/24/2025