作者: 懂的都懂原文来源:https://tidb.net/blog/66534a68
0. 背景介绍
本文将介绍如何将 TiDB 中的数据,通过 TiCDC 导入到 Kafka 中,继而被 Flink 消费的案例。
为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式部署。如果需要在生产环境中部署,建议将每一个组件替换成高可用的集群部署方案。
其中,我们单独创建了一套 Zookeeper 单节点环境,Flink、Kafka、等组件共用这个 Zookeeper 环境。
针对于所有需要 JRE 的组件,如 Flink,Kafka,Zookeeper,考虑到升级 JRE 可能会影响到其他的应用,我们选择每个组件独立使用自己的 JRE 环境。
本文分为两个部分,其中,前五小节主要介绍基础环境的搭建,最后一个小节介绍了数据是如何在各个组件中流通的。
数据的流动经过了以下的组件:
TiDB 的 TiCDC 组件生成了 canal-json 协议的流数据
TiCDC 将流数据推送到 Kafka 的 Topic 中
Flink 通过 flink-sql-connector-kafka API,消费 Kafka Topic 中的数据
1. 应用场景介绍
TiDB + Flink 的结构,支持开发与运行多种不同种类的应用程序。
目前主要的特性主要包括:
批流一体化
精密的状态管理
事件时间支持
精确的一次状态一致性保障
Flink 可以运行在包括 YARN、Mesos、Kubernetes 在内的多种资源管理框架上,还支持裸机集群上独立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同时也支持使用 TiUP 在裸机集群上独立部署。
TiDB + Flink 结构常见的几类应用如下:
事件驱动型应用
- 反欺诈
异常检测
基于规则的报警
业务流程监控
数据分析应用
- 网络质量监控
产品更新及试验评估分析
事实数据即席分析
大规模图分析
数据管道应用
- 电商实时查询索引构建
电商持续 ETL
2. 环境介绍
2.1 操作系统环境
[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8
复制代码
2.2 软件环境
2.3 机器分配
| Hostname | IP | Component | | | || ——– | ————- | ———— | - | - | - || r21 | 192.168.12.21 | TiDB Cluster | | | || r22 | 192.168.12.22 | Kafka | | | || r23 | 192.168.12.23 | Flink | | | || r24 | 192.168.12.24 | Zookeeper | | | |
3. 部署 TiDB Cluster
与传统的单机数据库相比,TiDB 具有以下优势:
纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容
支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL
默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明
支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账
具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景
在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间互相通信,组成完整的 TiDB 系统。对应的架构图如下:
在本文中,我们只做最简单的功能测试,所以部署了一套单节点但副本的 TiDB,涉及到了以下的三个模块:
TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,负责接受客户端的连接,执行 SQL 解析和优化,最终生成分布式执行计划。
PD (Placement Driver) Server:整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时的数据分布情况和集群的整体拓扑结构,提供 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。
TiKV Server:负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。
TiCDC:TiCDC 是一款通过拉取 TiKV 变更日志实现的 TiDB 增量数据同步工具,具有将数据还原到与上游任意 TSO 一致状态的能力,同时提供开放数据协议 (TiCDC Open Protocol),支持其他系统订阅数据变更。
3.1 TiUP 部署模板文件
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
user: "tidb"
ssh_port: 22
deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# # readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field.
server_configs:
tidb:
log.slow-threshold: 300
binlog.enable: false
binlog.ignore-error: false
tikv-client.copr-cache.enable: true
tikv:
server.grpc-concurrency: 4
raftstore.apply-pool-size: 2
raftstore.store-pool-size: 2
rocksdb.max-sub-compactions: 1
storage.block-cache.capacity: "16GB"
readpool.unified.max-thread-count: 12
readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true
raftdb.rate-bytes-per-sec: 0
pd:
schedule.leader-schedule-limit: 4
schedule.region-schedule-limit: 2048
schedule.replica-schedule-limit: 64
pd_servers:
- host: 192.168.12.21
ssh_port: 22
name: "pd-2"
client_port: 12379
peer_port: 12380
deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.pd` values.
config:
schedule.max-merge-region-size: 20
schedule.max-merge-region-keys: 200000
tidb_servers:
- host: 192.168.12.21
ssh_port: 22
port: 14000
status_port: 12080
deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tidb` values.
config:
log.slow-query-file: tidb-slow-overwrited.log
tikv-client.copr-cache.enable: true
tikv_servers:
- host: 192.168.12.21
ssh_port: 22
port: 12160
status_port: 12180
deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tikv` values.
config:
server.grpc-concurrency: 4
#server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
cdc_servers:
- host: 192.168.12.21
ssh_port: 22
port: 18300
deploy_dir: "/opt/tidb-c1/cdc-18300"
log_dir: "/opt/tidb-c1/log/ticdc-18300"
config:
enable-old-value: true
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
复制代码
3.2 TiDB Cluster 环境
本文重点非部署 TiDB Cluster,作为快速实验环境,只在一台机器上部署但副本的 TiDB Cluster 集群。不需要部署监控环境。
[root@r20 topology]# tiup cluster display tidb-c1-v409
Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409
Cluster type: tidb
Cluster name: tidb-c1-v409
Cluster version: v4.0.9
SSH type: builtin
Dashboard URL: http://192.168.12.21:12379/dashboard
ID Role Host Ports OS/Arch Status Data Dir Deploy Dir
-- ---- ---- ----- ------- ------ -------- ----------
192.168.12.21:18300 cdc 192.168.12.21 18300 linux/x86_64 Up - /opt/tidb-c1/cdc-8300
192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379
192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000
192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160
Total nodes: 4
复制代码
创建用于测试的表:
mysql> show create table t1;
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| t1 | CREATE TABLE `t1` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
复制代码
4. 部署 Zookeeper 环境
在本实验中单独配置 Zookeeper 环境,为 Kafka 和 Flink 环境提供服务。
作为实验演示方案,只部署单机环境。
4.1 解压 Zookeeper 包
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
复制代码
4.2 部署用于 Zookeeper 的 jre
个人建议每个独立的应用程序都使用自己的 jre 环境,这样可以避免因为升级 jre 影响到其他应用程序:
[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
复制代码
修改 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 环境变量:
## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre
复制代码
4.3 创建 Zookeeper 的配置文件
[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
复制代码
4.4 启动 Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
复制代码
4.5 检查 Zookeeper 的状态
## check zk status
[root@r24 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
## check OS port status
[root@r24 bin]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd
tcp6 0 0 :::2181 :::* LISTEN 15062/java
tcp6 0 0 :::8080 :::* LISTEN 15062/java
tcp6 0 0 :::22 :::* LISTEN 942/sshd
tcp6 0 0 :::44505 :::* LISTEN 15062/java
## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
复制代码
4.6 关于 Zookeeper 的建议
我个人有一个关于 Zookeeper 的不成熟的小建议:
Zookeeper 集群版本一定要开启网络监控。
特别是要关注 system metrics 里面的 network bandwidth。
5. 部署 Kafka
Kafka 是一个分布式流处理平台,主要应用于两大类的应用中:
Kafka 有四个核心的 API:
The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个 Kafka topic。The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。The Streams API 允许一个应用程序作为一个 * 流处理器 *,消费一个或者多个 topic 产生的输入流,然后生产一个输出流到一个或多个 topic 中去,在输入输出流中进行有效的转换。The Connector API 允许构建并运行可重用的生产者或者消费者,将 Kafka topics 连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
在本实验中只做功能性验证,只搭建一个单机版的 Kafka 环境。
5.1 下载并解压 Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka
复制代码
5.2 部署用于 Kafka 的 jre
[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
复制代码
修改 Kafka 的 jre 环境变量:
[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.sh
JAVA_HOME=/opt/kafka/jre
复制代码
5.3 修改 Kafka 配置文件
修改 Kafka 配置文件 /opt/kafka/config/server.properties:
## change bellowing variable in /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=i192.168.12.24:2181
复制代码
5.4 启动 Kafka
[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
复制代码
5.5 查看 Kafka 的版本信息
Kafka 并没有提供 –version 的 optional 来查看 Kafka 的版本信息:
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc
-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc
-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc
... ...
复制代码
其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。
6. 部署 Flink
Apache Flink 是一个框架和分布式处理引擎,用于在 * 无边界和有边界 * 数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
支持高吞吐、低延迟、高性能的分布式处理框架 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
本实验只做功能性测试,仅部署单机 Flink 环境。
6.1 下载并分发 Flink
[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink
复制代码
6.2 部署 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
复制代码
6.3 添加 flink-sql-connector-kafka library
[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
复制代码
6.4 部署 Flink jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
复制代码
6.5 修改 Flink 配置文件
## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.12.23
env.java.home: /opt/flink/jre
复制代码
6.6 启动 Flink
[root@r23 ~]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.
复制代码
6.7 查看 Flink GUI
#
7. 配置数据流向
7.1 ticdc -> Kafka 通路
TiCDC 运行时是一种无状态节点,通过 PD 内部的 etcd 实现高可用。TiCDC 集群支持创建多个同步任务,向多个不同的下游进行数据同步。
TiCDC 的系统架构如下图所示:
TiCDC 的系统角色:
TiKV CDC 组件:只输出 key-value (KV) change log。
内部逻辑拼装 KV change log。
提供输出 KV change log 的接口,发送数据包括实时 change log 和增量扫的 change log。
Capture:TiCDC 运行进程,多个 capture 组成一个 TiCDC 集群,负责 KV change log 的同步。
每个 capture 负责拉取一部分 KV change log。
对拉取的一个或多个 KV change log 进行排序。
向下游还原事务或按照 TiCDC Open Protocol 进行输出。
7.1.1 创建一个 Kafka Topic
创建 Kafka Topic ticdc-test
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --create \"
> --zookeeper 192.168.12.24:2181 \"
> --config max.message.bytes=12800000 \"
> --config flush.messages=1 \"
> --replication-factor 1 \"
> --partitions 1 \"
> --topic ticdc-test
Created topic ticdc-test.
复制代码
查看 Kafka 中所有的 Topic
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181
ticdc-test
复制代码
查看 Kafka 中 Topic ticdc-test 的信息
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic ticdc-test
Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1
Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
复制代码
7.1.2 在 TiCDC 中创建 Kafka 的 changefeed
创建 changefeed 配置文件,打开 enable-old-value:
## create a changefeed configuration file
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
enable-old-value=true
复制代码
创建 Kafka 的 changefeed:
## create a changefeed for kafka
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed create \"
> --pd=http://192.168.12.21:12379 \"
> --sink-uri="kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-old-value=true&protocol=canal-json" \"
> --changefeed-id="ticdc-kafka" \"
> --config=/opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
Create changefeed successfully!
ID: ticdc-kafka
Info: {"sink-uri":"kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\"u0026artition-num=1\"u0026max-message-bytes=67108864\"u0026replication-factor=1\"u0026enable-old-value=true\"u0026protocol=canal-json","opts":{"max-message-bytes":"67108864"},"create-time":"2021-02-22T00:08:50.185073755-05:00","start-ts":423092690661933057,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"canal-json"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
复制代码
其中 Kafka 的 sink url 参数配置如下:
查看已经创建的 changefeed:
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 list
[
{
"id": "ticdc-kafka",
"summary": {
"state": "normal",
"tso": 423092789699936258,
"checkpoint": "2021-02-22 00:15:07.974",
"error": null
}
}
]
复制代码
查看 ticdc-kafka changefeed 的信息:
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 query -c ticdc-kafka
{
"info": {
"sink-uri": "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\"u0026artition-num=1\"u0026max-message-bytes=67108864\"u0026replication-factor=1\"u0026enable-old-value=true\"u0026protocol=canal -json",
"opts": {
"max-message-bytes": "67108864"
},
"create-time": "2021-02-22T00:08:50.185073755-05:00",
"start-ts": 423092690661933057,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "memory",
"sort-dir": ".",
"config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
"rules": [
"*.*"
],
"ignore-txn-start-ts": null,
"ddl-allow-list": null
},
"mounter": {
"worker-num": 16
},
"sink": {
"dispatchers": null,
"protocol": "canal-json"
},
"cyclic-replication": {
"enable": false,
"replica-id": 0,
"filter-replica-ids": null,
"id-buckets": 0,
"sync-ddl": false
},
"scheduler": {
"type": "table-number",
"polling-time": -1
}
},
"state": "normal",
"history": null,
"error": null,
"sync-point-enabled": false,
"sync-point-interval": 600000000000
},
"status": {
"resolved-ts": 423093295690285057,
"checkpoint-ts": 423093295428403201,
"admin-job-type": 0
},
"count": 0,
"task-status": []
}
复制代码
7.1.3 查看 Kafka 中 consumer 信息
在 TiCDC 中创建 Kafka 的 changefeed,将数据流向 Kafka 中的 ticdc-test topic 后,TiCDC -> Kafka 的通道就已经建立了。
插入一条数据用以测试:
mysql> insert into t1 values(1);
Query OK, 1 row affected (0.00 sec)
复制代码
可以看到 TiCDC 的日志输出中有以下信息:
[2021/02/22 01:14:02.816 -05:00] [INFO] [statistics.go:118] ["sink replication status"] [name=MQ] [changefeed=ticdc-kafka] [captureaddr=192.168.12.21:18300] [count=1] [qps=0]
复制代码
此时查看 Kafka 的 consumer 信息,可以看到数据已经过来了:
[root@r22 bin]# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic ticdc-test --from-beginning
{"id":0,"database":"test","table":"t1","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1613974420325,"ts":0,"sql":"","sqlType":{"id":-5},"mysqlType":{"id":"int"},"data":[{"id":"1"}],"old":[null]}
复制代码
7.2 Kafka -> Flink 通路
在 Flink 的 sql-client 中,创建 t1 表,connector 使用 kafka 类型:
[root@r23 ~]# /opt/flink/bin/sql-client.sh embedded
## create a test table t1 in
Flink SQL> create table t1(id int)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'ticdc-test',
> 'properties.bootstrap.servers' = '192.168.12.22:9092',
> 'properties.group.id' = 'cdc-test-consumer-group',
> 'format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset'
> );
Flink SQL> select * from t1;
复制代码
在 TiDB 中插入数据,从 Flink 中进行查询:
## insert a test row in TiDB
mysql> insert into test.t1 values(4);
Query OK, 1 row affected (0.00 sec)
## check the result from Flink
SQL Query Result (Table)
Refresh: 1 s Page: Last of 1 Updated: 03:02:28.838
id
4
复制代码
评论