Carry の Blog Carry の Blog
首页
  • Nginx
  • Prometheus
  • Iptables
  • Systemd
  • Firewalld
  • Docker
  • Sshd
  • DBA工作笔记
  • MySQL
  • Redis
  • TiDB
  • Elasticsearch
  • Python
  • Shell
  • MySQL8-SOP手册
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Carry の Blog

好记性不如烂键盘
首页
  • Nginx
  • Prometheus
  • Iptables
  • Systemd
  • Firewalld
  • Docker
  • Sshd
  • DBA工作笔记
  • MySQL
  • Redis
  • TiDB
  • Elasticsearch
  • Python
  • Shell
  • MySQL8-SOP手册
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • MySQL

  • Redis

  • Keydb

  • TiDB

    • TiCDC同步数据到Kafka
      • 一、TiCDC简介
      • 二、部署准备
        • 2.1 环境要求
        • 2.2 安装TiCDC
      • 三、配置文件详解
      • 四、常用操作命令
        • 4.1 创建同步任务
        • 4.2 任务管理命令
      • 五、数据格式示例
      • 六、性能调优
        • 6.1 TiCDC参数调优
        • 6.2 Kafka参数调优
      • 七、常见问题与解决方案
        • 7.1 同步延迟高
        • 7.2 数据丢失
        • 7.3 同步任务卡住
      • 八、最佳实践
    • 对TiDB中算子的深入理解
    • TiDB使用 TTL (Time to Live) 定期删除过期数据
    • 如何移除TiDB中的表分区
    • TiDB配置文件调优
    • 深入解析TiFlash:原理、适用场景与调优实践
    • tidb fast ddl
  • MongoDB

  • Elasticsearch

  • Kafka

  • victoriametrics

  • BigData

  • Sqlserver

  • 数据库
  • TiDB
Carry の Blog
2022-03-10
目录

TiCDC同步数据到Kafka原创

# TiCDC同步数据到Kafka

# 一、TiCDC简介

TiCDC (TiDB Change Data Capture) 是TiDB官方提供的数据变更捕获工具,用于将TiDB集群的增量数据变更实时同步到下游系统。它具有以下特点:

  • 低延迟:毫秒级别的同步延迟
  • 高可用:多节点部署,自动故障转移
  • 水平扩展:可通过增加节点提升同步能力
  • 集成简单:无需修改TiDB集群配置
  • 多种输出格式:支持Canal-JSON、Avro、Maxwell等格式

# 二、部署准备

# 2.1 环境要求

  • TiDB 版本 ≥ 4.0.0
  • 确保TiCDC服务器与TiDB集群、Kafka集群网络互通
  • 推荐TiCDC节点配置:8核CPU,16GB内存,SSD磁盘

# 2.2 安装TiCDC

TiCDC组件通常随TiDB集群一起部署,可通过TiUP工具进行管理:

# 使用TiUP部署包含TiCDC组件的TiDB集群
tiup cluster deploy tidb-test v5.4.0 ./topology.yaml --user tidb

# 向现有集群扩容TiCDC节点
tiup cluster scale-out tidb-test ./scale-out.yaml --user tidb
1
2
3
4
5

scale-out.yaml示例:

cdc_servers:
  - host: 10.0.1.20
    port: 8300
    deploy_dir: "/tidb-deploy/cdc-8300"
    data_dir: "/tidb-data/cdc-8300"
    log_dir: "/tidb-deploy/cdc-8300/log"
1
2
3
4
5
6

# 三、配置文件详解

TiCDC同步任务需要创建配置文件(例如tidb2kafka.toml),用于定义同步行为:

# 是否区分大小写
case-sensitive = true

# 启用老值存储,用于输出变更前的数据(用于UPDATE事件)
enable-old-value = true

# 同步任务的启动模式
# 可选值: "normal"(默认), "offline"(离线模式)
mode = "normal"

# 过滤器配置
[filter]
# 忽略特定事务的起始时间戳
ignore-txn-start-ts = [1, 2]

# 需要同步的表规则列表
# 支持正则表达式,例如 'test.*' 表示同步test库下所有表
rules = [
    'my_db.table1',
    'my_db.table2',
    'my_db.table3'
]

# 解析器配置
[mounter]
# 解析器工作线程数
worker-num = 16

# 调度器配置
[[sink.dispatchers]]
# 匹配规则,决定哪些表使用此调度策略
matcher = [
    'my_db.table1',
    'my_db.table2',
    'my_db.table3'
]
# 调度策略:
# - default: 默认策略,确保事务完整性
# - ts: 按时间戳调度,提高并发性能
# - rowid: 按行ID调度,适用于有主键的表
# - table: 按表调度,每个表一个分区
dispatcher = 'ts'

# Sink 配置
[sink]
# 输出数据格式:
# - canal-json: 阿里Canal格式
# - avro: Avro格式,需配合Schema Registry
# - maxwell: Maxwell格式
# - open-protocol: TiCDC开放协议
protocol = "canal-json"

# 失败重试配置
[sink.retry]
# 最大重试次数,0表示无限重试
max-retry-times = 3
# 重试间隔(毫秒)
retry-interval = 1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 四、常用操作命令

# 4.1 创建同步任务

cdc cli changefeed create \
    --pd="http://10.10.10.105:2379" \
    --sink-uri="kafka://10.10.10.231:9092/my_topic?protocol=canal-json&kafka-version=2.6.0&partition-num=10&max-message-bytes=67108864&replication-factor=3" \
    --changefeed-id="tidb-to-kafka" \
    --config="tidb2kafka.toml" \
    --start-ts="433686579570032640"
1
2
3
4
5
6

参数详解:

  • --pd: PD集群地址,支持多地址用逗号分隔
  • --sink-uri: Kafka连接URI,包含以下参数:
    • protocol: 输出格式,如canal-json
    • kafka-version: Kafka版本
    • partition-num: Kafka主题分区数
    • max-message-bytes: 单条消息最大字节数
    • replication-factor: 副本因子
    • ca: CA证书路径(SSL连接时使用)
    • cert: 客户端证书路径(SSL连接时使用)
    • key: 客户端密钥路径(SSL连接时使用)
  • --changefeed-id: 同步任务ID,集群内唯一
  • --config: 配置文件路径
  • --start-ts: 同步起始时间戳,可通过tidb-binlog工具获取,0表示从当前时间开始

# 4.2 任务管理命令

# 查看所有同步任务
cdc cli changefeed list --pd="http://10.10.10.105:2379"

# 查询特定任务状态
cdc cli changefeed query --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"

# 暂停同步任务
cdc cli changefeed pause --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"

# 恢复同步任务
cdc cli changefeed resume --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"

# 更新任务配置
cdc cli changefeed update \
    --pd="http://10.10.10.105:2379" \
    --changefeed-id="tidb-to-kafka" \
    --config="updated_config.toml"

# 删除同步任务
cdc cli changefeed remove --pd="http://10.10.10.105:2379" --changefeed-id="tidb-to-kafka"

# 查看TiCDC节点状态
cdc cli capture list --pd="http://10.10.10.105:2379"

# 查看任务处理器状态
cdc cli processor list --pd="http://10.10.10.105:2379"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 五、数据格式示例

使用canal-json格式时,Kafka中的消息结构如下:

{
  "id": 123,
  "database": "my_db",
  "table": "my_table",
  "pkNames": ["id"],
  "isDdl": false,
  "type": "UPDATE",
  "es": 1638352903000,
  "ts": 1638352903123,
  "sql": "",
  "sqlType": {"id": 4, "name": 12, "age": 4},
  "data": [{"id": "1", "name": "张三", "age": "25"}],
  "old": [{"id": "1", "name": "张三", "age": "24"}]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

字段说明:

  • id: 消息唯一ID
  • database: 数据库名
  • table: 表名
  • pkNames: 主键列名
  • isDdl: 是否为DDL操作
  • type: 操作类型(INSERT/UPDATE/DELETE)
  • es: 事件时间戳(毫秒)
  • ts: 处理时间戳(毫秒)
  • sql: DDL语句(仅DDL事件)
  • sqlType: 列数据类型
  • data: 变更后数据
  • old: 变更前数据(仅UPDATE事件且enable-old-value=true时)

# 六、性能调优

# 6.1 TiCDC参数调优

# 提高并行度
[mounter]
worker-num = 32

# 使用表级分发提高并行度
[[sink.dispatchers]]
dispatcher = "table"

# 调整批处理大小
[sink]
flush-interval = 500  # 毫秒
max-batch-size = 1024 # 条数
1
2
3
4
5
6
7
8
9
10
11
12

# 6.2 Kafka参数调优

  • 增加分区数提高并行处理能力
  • 调整副本因子平衡可靠性和性能
  • 配置合适的消息大小限制

# 七、常见问题与解决方案

# 7.1 同步延迟高

可能原因:

  • TiCDC节点资源不足
  • Kafka集群负载高
  • 网络带宽限制

解决方案:

  • 增加TiCDC节点数量
  • 优化配置文件中的worker-num参数
  • 检查网络连接质量

# 7.2 数据丢失

可能原因:

  • TiCDC节点异常退出
  • Kafka集群不可用

解决方案:

  • 设置合适的Kafka副本因子(至少3)
  • 启用Kafka的acks=all配置
  • 定期检查同步状态和监控告警

# 7.3 同步任务卡住

可能原因:

  • 大事务处理
  • PD节点异常

解决方案:

  • 检查TiCDC日志
  • 使用cdc cli changefeed query查看任务状态
  • 必要时重启同步任务

# 八、最佳实践

  1. 合理规划资源:TiCDC节点数量应与需要同步的数据量相匹配
  2. 监控同步延迟:设置监控和告警,及时发现同步问题
  3. 定期检查状态:使用cdc cli changefeed query命令检查同步状态
  4. 增量同步策略:生产环境中指定合适的start-ts,避免全量同步
  5. 表结构变更:DDL操作前暂停同步任务,操作完成后恢复
  6. 安全连接:生产环境使用SSL加密连接Kafka集群
  7. 定期备份配置:保存同步任务配置,便于灾难恢复
#TiCDC#Kafka#数据同步
上次更新: 4/24/2025

← 安装配置 对TiDB中算子的深入理解→

最近更新
01
tidb fast ddl
04-04
02
TiDB配置文件调优 原创
04-03
03
如何移除TiDB中的表分区 原创
04-03
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式